diff --git a/docs/src/sdp/api.rst b/docs/src/sdp/api.rst index 4eecb933..d6c0b437 100644 --- a/docs/src/sdp/api.rst +++ b/docs/src/sdp/api.rst @@ -242,6 +242,9 @@ Data modifications .. autodata:: sdp.processors.LambdaExpression :annotation: + +.. autodata:: sdp.processors.ListToEntries + :annotation: Data filtering '''''''''''''' diff --git a/sdp/processors/__init__.py b/sdp/processors/__init__.py index 79f2205e..a4aa02f2 100644 --- a/sdp/processors/__init__.py +++ b/sdp/processors/__init__.py @@ -111,6 +111,7 @@ SubIfASRSubstitution, SubMakeLowercase, SubRegex, + ListToEntries, LambdaExpression, ) from sdp.processors.modify_manifest.data_to_dropbool import ( diff --git a/sdp/processors/modify_manifest/data_to_data.py b/sdp/processors/modify_manifest/data_to_data.py index 71a6ac90..defb69df 100644 --- a/sdp/processors/modify_manifest/data_to_data.py +++ b/sdp/processors/modify_manifest/data_to_data.py @@ -975,6 +975,138 @@ def process(self): logger.debug(f"Failed files: {self.failed_files}") +class ListToEntries(BaseParallelProcessor): + """ + A dataset processor that transforms a single entry containing a list of items into multiple entries, + one for each item in the list. + + This is useful when a manifest field (e.g., "segments") contains a list of sub-entries, and you want + to flatten these into individual records for further processing. + + Args: + field_with_list (str): The name of the field in the input entry that contains a list. + output_field (str, optional): The name of the output field to assign to items in the list + if they are not dictionaries. Required if the list contains primitive types (e.g., strings). + **kwargs: Additional arguments passed to the BaseParallelProcessor. + + Raises: + TypeError: If the specified list field is not of type list. + ValueError: If the list items are not dictionaries and `output_field` is not provided. + + Returns: + A manifest where each entry corresponds to one item in the original list from the input entry. + This effectively transforms a single input entry containing a list of items into multiple standalone + entries, each suitable for further dataset processing. + + .. admonition:: Example 1 (list of dicts) + + .. code-block:: yaml + + - _target_: sdp.processors.ListToEntries + input_manifest_file: ${workspace_dir}/input_manifest.json + output_manifest_file: ${workspace_dir}/output_manifest.json + field_with_list: "segments" + + Input:: + + { + "audio_filepath": "sample.wav", + "segments": [ + {"start": 0.0, "end": 1.5, "text": "Hello"}, + {"start": 1.6, "end": 3.0, "text": "World"} + ] + } + + Output:: + + [ + { + "audio_filepath": "sample.wav", + "start": 0.0, + "end": 1.5, + "text": "Hello" + }, + { + "audio_filepath": "sample.wav", + "start": 1.6, + "end": 3.0, + "text": "World" + } + ] + + .. admonition:: Example 2 (list of primitives) + + .. code-block:: yaml + + - _target_: sdp.processors.ListToEntries + input_manifest_file: ${workspace_dir}/input_manifest.json + output_manifest_file: ${workspace_dir}/output_manifest.json + field_with_list: "text_chunks" + output_field: "text" + + Input:: + + { + "audio_filepath": "sample.wav", + "text_chunks": [ + "Hello", + "World" + ] + } + + Output:: + + [ + { + "audio_filepath": "sample.wav", + "text": "Hello" + }, + { + "audio_filepath": "sample.wav", + "text": "World" + } + ] + + """ + + def __init__(self, + field_with_list: str, + output_field: str = None, + **kwargs): + super().__init__(**kwargs) + self.field_with_list = field_with_list + self.output_field = output_field + + def process_dataset_entry(self, data_entry): + _entries = [] + + # Check that the target field is actually a list + if not isinstance(data_entry[self.field_with_list], list): + raise TypeError(f'Values of {self.field_with_list} field should be list type only: {data_entry}') + + # Remove the list field from the entry and get the list of items + items_list = data_entry.pop(self.field_with_list) + + # If items are not dicts, output_field must be specified to store the item + if not isinstance(items_list[0], dict) and not self.output_field: + raise ValueError(f'Type of items in items list `{self.field_with_list}` is not dict ({type(items_list[0])}). In this case `output_field` should be provided.') + + # Expand the list into multiple entries + for item in items_list: + _entry = data_entry.copy() + + # If item is a dict, merge its keys; otherwise, store it in `output_field` + if isinstance(item, dict): + _entry.update(item) + else: + _entry[self.output_field] = item + + _entry = DataEntry(_entry) + _entries.append(_entry) + + return _entries + + class LambdaExpression(BaseParallelProcessor): """ A dataset processor that evaluates a Python expression on each data entry and either stores diff --git a/tests/test_data_to_data.py b/tests/test_data_to_data.py index 9f62f730..a18e40e8 100644 --- a/tests/test_data_to_data.py +++ b/tests/test_data_to_data.py @@ -19,6 +19,7 @@ SubIfASRSubstitution, SubMakeLowercase, SubRegex, + ListToEntries, LambdaExpression, ) @@ -91,6 +92,24 @@ ] ) +test_params_list.extend( + [ + # Test: list of dictionaries (e.g., segments) + ( + ListToEntries, + {"field_with_list": "segments"}, + {"audio_filepath": "a.wav", "segments": [{"start": 0.0, "end": 1.0, "text": "Hello"}, {"start": 1.1, "end": 2.0, "text": "World"}], "duration": 2.5}, + [{"audio_filepath": "a.wav", "duration": 2.5, "start": 0.0, "end": 1.0, "text": "Hello"}, {"audio_filepath": "a.wav", "duration": 2.5, "start": 1.1, "end": 2.0, "text": "World"}] + ), + # Test: list of primitive values (strings), requires output_field + ( + ListToEntries, + {"field_with_list": "text_chunks", "output_field": "text"}, + {"audio_filepath": "b.wav", "text_chunks": ["Привет", "Мир"], "lang": "ru"}, + [{"audio_filepath": "b.wav", "lang": "ru", "text": "Привет"}, {"audio_filepath": "b.wav", "lang": "ru", "text": "Мир"}] + ), + ] +) test_params_list.extend( [