Skip to content

Implement some beam operators (w3) #400

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/src/reference/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Check the index on the left for a more detailed description of any symbol.
| [`EventSet.equal()`][temporian.EventSet.equal] | Creates boolean features with event-wise equality to another `EventSet` or to a scalar value. |
| [`EventSet.experimental_fast_fourier_transform()`][temporian.EventSet.experimental_fast_fourier_transform] | Applies a Fast Fourier Transform. |
| [`EventSet.filter()`][temporian.EventSet.filter] | Filters out events in an [`EventSet`][temporian.EventSet] for which a condition is false. |
| [`EventSet.filter_empty_index()`][temporian.EventSet.filter_empty_index] | Filters out indexes without events. |
| [`EventSet.filter_moving_count()`][temporian.EventSet.filter_moving_count] | Skips events such that no more than one event is within a time window of `window_length`. |
| [`EventSet.isnan()`][temporian.EventSet.isnan] | Event-wise boolean that is `True` in the `NaN` positions of the input events. Equivalent to `~evset.notnan()`. |
| [`EventSet.join()`][temporian.EventSet.join] | Joins [`EventSets`][temporian.EventSet] with different samplings but the same index together. |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: temporian.EventSet.filter_empty_index
13 changes: 4 additions & 9 deletions temporian/beam/io/tensorflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,27 +93,22 @@ def f(example: "tensorflow.train.Example", key: str):
# Features
for feature_schema in self._schema.features:
src_value = dict_example[feature_schema.name]

if feature_schema.dtype in [
DType.BOOLEAN,
DType.INT32,
DType.INT64,
]:
f(ex, feature_schema.name).int64_list.value[
:
] = src_value = dict_example[feature_schema.name]
f(ex, feature_schema.name).int64_list.value[:] = src_value

elif feature_schema.dtype in [
DType.FLOAT32,
DType.FLOAT64,
]:
f(ex, feature_schema.name).float_list.value[
:
] = src_value = dict_example[feature_schema.name]
f(ex, feature_schema.name).float_list.value[:] = src_value

elif feature_schema.dtype == DType.STRING:
f(ex, feature_schema.name).bytes_list.value[
:
] = src_value = dict_example[feature_schema.name]
f(ex, feature_schema.name).bytes_list.value[:] = src_value

else:
raise ValueError("Non supported feature dtype")
Expand Down
140 changes: 140 additions & 0 deletions temporian/beam/operators/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ py_library(
":select",
":rename",
":prefix",
":leak",
":glue",
":filter",
":timestamps",
":filter_empty_index",
":cast",
":unique_timestamps",
":resample",
":drop_index",
":propagate",
"//temporian/beam/operators/scalar",
"//temporian/beam/operators/binary",
"//temporian/beam/operators/window:moving_count",
"//temporian/beam/operators/window:moving_max",
"//temporian/beam/operators/window:moving_min",
Expand Down Expand Up @@ -85,3 +97,131 @@ py_library(
"//temporian/core/operators:prefix",
],
)


py_library(
name = "leak",
srcs = ["leak.py"],
srcs_version = "PY3",
deps = [
"//temporian/beam:implementation_lib",
"//temporian/beam:typing",
"//temporian/beam/operators:base",
"//temporian/core/operators:leak",
],
)

py_library(
name = "glue",
srcs = ["glue.py"],
srcs_version = "PY3",
deps = [
"//temporian/beam:implementation_lib",
"//temporian/beam:typing",
"//temporian/beam/operators:base",
"//temporian/core/operators:glue",
],
)


py_library(
name = "filter",
srcs = ["filter.py"],
srcs_version = "PY3",
deps = [
"//temporian/beam:implementation_lib",
"//temporian/beam:typing",
"//temporian/beam/operators:base",
"//temporian/core/operators:filter",
],
)


py_library(
name = "timestamps",
srcs = ["timestamps.py"],
srcs_version = "PY3",
deps = [
"//temporian/beam:implementation_lib",
"//temporian/beam:typing",
"//temporian/beam/operators:base",
"//temporian/core/operators:timestamps",
],
)


py_library(
name = "filter_empty_index",
srcs = ["filter_empty_index.py"],
srcs_version = "PY3",
deps = [
"//temporian/beam:implementation_lib",
"//temporian/beam:typing",
"//temporian/beam/operators:base",
"//temporian/core/operators:filter_empty_index",
],
)


py_library(
name = "cast",
srcs = ["cast.py"],
srcs_version = "PY3",
deps = [
"//temporian/beam:implementation_lib",
"//temporian/beam:typing",
"//temporian/beam/operators:base",
"//temporian/core/operators:cast",
],
)


py_library(
name = "unique_timestamps",
srcs = ["unique_timestamps.py"],
srcs_version = "PY3",
deps = [
"//temporian/beam:implementation_lib",
"//temporian/beam:typing",
"//temporian/beam/operators:base",
"//temporian/core/operators:unique_timestamps",
],
)


py_library(
name = "resample",
srcs = ["resample.py"],
srcs_version = "PY3",
deps = [
"//temporian/beam:implementation_lib",
"//temporian/beam:typing",
"//temporian/beam/operators:base",
"//temporian/core/operators:resample",
],
)

py_library(
name = "drop_index",
srcs = ["drop_index.py"],
srcs_version = "PY3",
deps = [
"//temporian/beam:implementation_lib",
"//temporian/beam:typing",
"//temporian/beam/operators:base",
"//temporian/core/operators:drop_index",
],
)


py_library(
name = "propagate",
srcs = ["propagate.py"],
srcs_version = "PY3",
deps = [
"//temporian/beam:implementation_lib",
"//temporian/beam:typing",
"//temporian/beam/operators:base",
"//temporian/core/operators:propagate",
],
)
12 changes: 12 additions & 0 deletions temporian/beam/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,15 @@
from temporian.beam.operators import add_index
from temporian.beam.operators import rename
from temporian.beam.operators import prefix
from temporian.beam.operators import leak
from temporian.beam.operators import glue
from temporian.beam.operators import filter
from temporian.beam.operators import timestamps
from temporian.beam.operators import filter_empty_index
from temporian.beam.operators import cast
from temporian.beam.operators import unique_timestamps
from temporian.beam.operators import resample
from temporian.beam.operators import drop_index
from temporian.beam.operators import propagate
from temporian.beam.operators.scalar import scalar
from temporian.beam.operators.binary import binary
23 changes: 19 additions & 4 deletions temporian/beam/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ def __call__(self, **inputs: BeamEventSet) -> Dict[str, BeamEventSet]:


def beam_eventset_map(
src: BeamEventSet, name: str, fn: Callable[[FeatureItem, int], FeatureItem]
src: BeamEventSet,
name: str,
fn: Callable[[FeatureItem, int], Iterable[FeatureItem]],
) -> BeamEventSet:
"""Applies a function on each feature of a Beam eventset."""

Expand All @@ -56,7 +58,20 @@ def apply(idx, item):
return tuple([apply(idx, item) for idx, item in enumerate(src)])


def _extract_from_iterable(
def beam_eventset_flatmap(
src: BeamEventSet,
name: str,
fn: Callable[[FeatureItem, int], Iterable[FeatureItem]],
) -> BeamEventSet:
"""Applies a function on each feature of a Beam eventset."""

def apply(idx, item):
return item | f"Map on feature #{idx} {name}" >> beam.FlatMap(fn, idx)

return tuple([apply(idx, item) for idx, item in enumerate(src)])


def extract_from_iterable(
src: Iterable[FeatureItemValue],
) -> Optional[FeatureItemValue]:
for x in src:
Expand Down Expand Up @@ -85,8 +100,8 @@ def fn_on_cogroup(
idx: int,
) -> Iterator[FeatureItem]:
index, (it_feature, it_sampling) = item
feature = _extract_from_iterable(it_feature)
sampling = _extract_from_iterable(it_sampling)
feature = extract_from_iterable(it_feature)
sampling = extract_from_iterable(it_sampling)
if sampling is not None:
yield fn(index, feature, sampling, idx)

Expand Down
19 changes: 19 additions & 0 deletions temporian/beam/operators/binary/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package(
default_visibility = ["//visibility:public"],
licenses = ["notice"],
)

# Libraries
# =========

py_library(
name = "binary",
srcs = ["binary.py"],
srcs_version = "PY3",
deps = [
"//temporian/beam:typing",
"//temporian/beam/operators:base",
"//temporian/core/operators/binary:base",
"//temporian/implementation/numpy/operators/binary:base",
],
)
Empty file.
Loading