Skip to content

feat(taskworker): Zstd compress process profile task #95545

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/sentry/profiles/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from sentry.silo.base import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.taskworker.config import TaskworkerConfig
from sentry.taskworker.constants import CompressionType
from sentry.taskworker.namespaces import ingest_profiling_tasks
from sentry.taskworker.retry import Retry
from sentry.utils import json, metrics
Expand Down Expand Up @@ -137,6 +138,7 @@ def encode_payload(message: dict[str, Any]) -> str:
times=2,
delay=5,
),
compression_type=CompressionType.ZSTD,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll end up double zipping here as the profiles consumer will do one zip, and then the taskworker system will zip again. This isn't ideal, but as an intermediary step it should be ok. We should test this locally to make sure we're not going to break the tasks.

),
)
def process_profile_task(
Expand Down
103 changes: 102 additions & 1 deletion tests/sentry/profiles/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from unittest import mock
from unittest.mock import patch

import msgpack
import pytest
from django.core.files.uploadedfile import SimpleUploadedFile
from django.urls import reverse
Expand All @@ -32,7 +33,7 @@
)
from sentry.profiles.utils import Profile
from sentry.signals import first_profile_received
from sentry.testutils.cases import TransactionTestCase
from sentry.testutils.cases import TestCase, TransactionTestCase
from sentry.testutils.factories import Factories, get_fixture_path
from sentry.testutils.helpers import Feature, override_options
from sentry.testutils.pytest.fixtures import django_db_all
Expand Down Expand Up @@ -1171,3 +1172,103 @@ def test_process_profile_task_should_flip_project_flag(
)
project.refresh_from_db()
assert project.flags.has_profiles


class TestProcessProfileTaskDoubleCompression(TestCase):
"""
TODO(taskworker): Remove this test once we have deleted zlib compression.
Test class for validating the double compression flow:
1. Consumer does zlib compression and calls process_profile_task.delay()
2. Taskworker does zstd compression on the task parameters
3. Task worker decompresses zstd and task decompresses zlib
"""

@patch("sentry.profiles.task._track_outcome")
@patch("sentry.profiles.task._track_duration_outcome")
@patch("sentry.profiles.task._symbolicate_profile")
@patch("sentry.profiles.task._deobfuscate_profile")
@patch("sentry.profiles.task._push_profile_to_vroom")
def test_consumer_to_task_double_compression_flow(
self,
_push_profile_to_vroom,
_deobfuscate_profile,
_symbolicate_profile,
_track_duration_outcome,
_track_outcome,
):
"""
Test that the full consumer -> task flow works with double compression.

This test validates:
1. process_message in factory.py does zlib compression
2. taskworker layer does zstd compression
3. Both decompressions work correctly in the task execution
"""
from datetime import datetime

from arroyo.backends.kafka import KafkaPayload
from arroyo.types import BrokerValue, Message, Partition, Topic
from django.utils import timezone

from sentry.profiles.consumers.process.factory import ProcessProfileStrategyFactory

# Mock the task functions
_push_profile_to_vroom.return_value = True
_deobfuscate_profile.return_value = True
_symbolicate_profile.return_value = True

# Get the profile fixture data
profile = generate_sample_v2_profile()

# Create a message dict like the consumer would receive from Kafka
message_dict = {
"organization_id": self.organization.id,
"project_id": self.project.id,
"key_id": 1,
"received": int(timezone.now().timestamp()),
"payload": json.dumps(profile),
}

# Pack the message with msgpack (like the consumer receives from Kafka)
payload = msgpack.packb(message_dict)

# Create the processing strategy (this will call process_message)
processing_strategy = ProcessProfileStrategyFactory().create_with_partitions(
commit=mock.Mock(), partitions={}
)

# Use self.tasks() to run the actual task with both compression layers
with self.tasks():
# Submit the message to the processing strategy
# This calls process_message which does:
# 1. zlib compression of the msgpack data
# 2. process_profile_task.delay() which adds zstd compression
processing_strategy.submit(
Message(
BrokerValue(
KafkaPayload(
b"key",
payload,
[],
),
Partition(Topic("profiles"), 1),
1,
datetime.now(),
)
)
)
processing_strategy.poll()
processing_strategy.join(1)
processing_strategy.terminate()

# Verify the task was executed successfully
assert _push_profile_to_vroom.call_count == 1
assert _deobfuscate_profile.call_count == 1
assert _symbolicate_profile.call_count == 1
assert _track_duration_outcome.call_count == 1

# Verify the profile was processed with correct data
processed_profile = _push_profile_to_vroom.call_args[0][0]
assert processed_profile["organization_id"] == self.organization.id
assert processed_profile["project_id"] == self.project.id
assert processed_profile["platform"] == profile["platform"]
Loading