Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions proxy/prefect_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
reset_connection_streams,
update_connection_schema,
clear_connection,
cancel_connection_job,
)
from prefect_airbyte import AirbyteConnection, AirbyteServer
from prefect_airbyte.connections import ResetStream
Expand Down Expand Up @@ -92,6 +93,33 @@ def run_airbyte_conn_clear(payload: dict):
raise


# task config for cancelling a running airbyte job
# {
# type AIRBYTECONNECTION,
# slug: "airbyte-cancel"
# airbyte_server_block: str
# connection_id: str
# job_id: str
# }
@flow
def run_airbyte_cancel_job(payload: dict):
"""cancel a running airbyte job"""
try:
airbyte_server_block = payload["airbyte_server_block"]
serverblock = AirbyteServer.load(airbyte_server_block)
connection_block = AirbyteConnection(
airbyte_server=serverblock,
connection_id=payload["connection_id"],
)
result = cancel_connection_job(connection_block, payload["job_id"])
logger.info("airbyte job cancel result=")
logger.info(result)
return result
except Exception as error: # skipcq PYL-W0703
logger.error(str(error)) # "Job cancel failed."
raise

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add input validation for required payload fields.

The flow implementation follows the established pattern but lacks input validation for required fields. Consider adding validation to prevent KeyError exceptions if required fields are missing from the payload.

@flow
def run_airbyte_cancel_job(payload: dict):
    """cancel a running airbyte job"""
    try:
+        # Validate required fields
+        required_fields = ["airbyte_server_block", "connection_id", "job_id"]
+        for field in required_fields:
+            if field not in payload:
+                raise ValueError(f"Missing required field: {field}")
+        
        airbyte_server_block = payload["airbyte_server_block"]
        serverblock = AirbyteServer.load(airbyte_server_block)
        connection_block = AirbyteConnection(
            airbyte_server=serverblock,
            connection_id=payload["connection_id"],
        )
        result = cancel_connection_job(connection_block, payload["job_id"])
        logger.info("airbyte job cancel result=")
        logger.info(result)
        return result
    except Exception as error:  # skipcq PYL-W0703
        logger.error(str(error))  # "Job cancel failed."
        raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# task config for cancelling a running airbyte job
# {
# type AIRBYTECONNECTION,
# slug: "airbyte-cancel"
# airbyte_server_block: str
# connection_id: str
# job_id: str
# }
@flow
def run_airbyte_cancel_job(payload: dict):
"""cancel a running airbyte job"""
try:
airbyte_server_block = payload["airbyte_server_block"]
serverblock = AirbyteServer.load(airbyte_server_block)
connection_block = AirbyteConnection(
airbyte_server=serverblock,
connection_id=payload["connection_id"],
)
result = cancel_connection_job(connection_block, payload["job_id"])
logger.info("airbyte job cancel result=")
logger.info(result)
return result
except Exception as error: # skipcq PYL-W0703
logger.error(str(error)) # "Job cancel failed."
raise
@flow
def run_airbyte_cancel_job(payload: dict):
"""cancel a running airbyte job"""
try:
# Validate required fields
required_fields = ["airbyte_server_block", "connection_id", "job_id"]
for field in required_fields:
if field not in payload:
raise ValueError(f"Missing required field: {field}")
airbyte_server_block = payload["airbyte_server_block"]
serverblock = AirbyteServer.load(airbyte_server_block)
connection_block = AirbyteConnection(
airbyte_server=serverblock,
connection_id=payload["connection_id"],
)
result = cancel_connection_job(connection_block, payload["job_id"])
logger.info("airbyte job cancel result=")
logger.info(result)
return result
except Exception as error: # skipcq PYL-W0703
logger.error(str(error)) # "Job cancel failed."
raise
🤖 Prompt for AI Agents
In proxy/prefect_flows.py around lines 96 to 121, the run_airbyte_cancel_job
flow lacks validation for required keys in the payload dictionary, which can
cause KeyError exceptions. Add explicit checks at the start of the function to
verify that "airbyte_server_block", "connection_id", and "job_id" keys exist in
the payload. If any are missing, raise a clear exception or handle the error
gracefully before proceeding with the rest of the flow logic.


@flow
def run_dbtcore_flow_v1(payload: dict):
# pylint: disable=broad-exception-caught
Expand Down Expand Up @@ -311,6 +339,9 @@ def deployment_schedule_flow_v4(
elif task_config["slug"] == "airbyte-clear":
run_airbyte_conn_clear(task_config)

elif task_config["slug"] == "airbyte-cancel":
run_airbyte_cancel_job(task_config)

elif task_config["slug"] == "update-schema":
asyncio.run(
run_refresh_schema_flow(
Expand Down
Loading