From 2da5569eb4f916ddc57e684703b46d5927ba73c0 Mon Sep 17 00:00:00 2001 From: Shunjia Ding Date: Tue, 17 Dec 2019 13:16:11 -0800 Subject: [PATCH 1/2] Add service integrations for SQS, SNS and DynamoDB --- src/stepfunctions/steps/__init__.py | 2 + src/stepfunctions/steps/service.py | 149 ++++++++++++++++++++ tests/unit/test_service_steps.py | 202 ++++++++++++++++++++++++++++ 3 files changed, 353 insertions(+) create mode 100644 src/stepfunctions/steps/service.py create mode 100644 tests/unit/test_service_steps.py diff --git a/src/stepfunctions/steps/__init__.py b/src/stepfunctions/steps/__init__.py index 35cfdb8..b27df7b 100644 --- a/src/stepfunctions/steps/__init__.py +++ b/src/stepfunctions/steps/__init__.py @@ -18,3 +18,5 @@ from stepfunctions.steps.states import Graph, FrozenGraph from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointConfigStep, EndpointStep from stepfunctions.steps.compute import LambdaStep, BatchSubmitJobStep, GlueStartJobRunStep, EcsRunTaskStep +from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep +from stepfunctions.steps.service import SnsPublishStep, SqsSendMessage diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py new file mode 100644 index 0000000..4995399 --- /dev/null +++ b/src/stepfunctions/steps/service.py @@ -0,0 +1,149 @@ +# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing +# permissions and limitations under the License. +from __future__ import absolute_import + +from stepfunctions.steps.states import Task +from stepfunctions.steps.fields import Field + + +class DynamoDBGetItemStep(Task): + """ + Creates a Task state to get an item from DynamoDB. See `Call DynamoDB APIs with Step Functions `_ for more details. + """ + + def __init__(self, state_id, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + """ + kwargs[Field.Resource.value] = 'arn:aws:states:::dynamodb:getItem' + super(DynamoDBGetItemStep, self).__init__(state_id, **kwargs) + + +class DynamoDBPutItemStep(Task): + + """ + Creates a Task state to put an item to DynamoDB. See `Call DynamoDB APIs with Step Functions `_ for more details. + """ + + def __init__(self, state_id, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + """ + kwargs[Field.Resource.value] = 'arn:aws:states:::dynamodb:putItem' + super(DynamoDBPutItemStep, self).__init__(state_id, **kwargs) + + +class DynamoDBDeleteItemStep(Task): + + """ + Creates a Task state to delete an item from DynamoDB. See `Call DynamoDB APIs with Step Functions `_ for more details. + """ + + def __init__(self, state_id, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + """ + kwargs[Field.Resource.value] = 'arn:aws:states:::dynamodb:deleteItem' + super(DynamoDBDeleteItemStep, self).__init__(state_id, **kwargs) + + +class DynamoDBUpdateItemStep(Task): + + """ + Creates a Task state to update an item from DynamoDB. See `Call DynamoDB APIs with Step Functions `_ for more details. + """ + + def __init__(self, state_id, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + """ + kwargs[Field.Resource.value] = 'arn:aws:states:::dynamodb:updateItem' + super(DynamoDBUpdateItemStep, self).__init__(state_id, **kwargs) + + +class SnsPublishStep(Task): + + """ + Creates a Task state to publish a message to SNS topic. See `Call Amazon SNS with Step Functions `_ for more details. + """ + + def __init__(self, state_id, wait_for_callback=False, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + wait_for_callback(bool, optional): Boolean value set to `True` if the Task state should wait for callback to resume the operation. (default: False) + timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60) + heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. + comment (str, optional): Human-readable comment or description. (default: None) + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + """ + if wait_for_callback: + kwargs[Field.Resource.value] = 'arn:aws:states:::sns:publish.waitForTaskToken' + else: + kwargs[Field.Resource.value] = 'arn:aws:states:::sns:publish' + + super(SnsPublishStep, self).__init__(state_id, **kwargs) + + +class SqsSendMessage(Task): + + """ + Creates a Task state to send a message to SQS queue. See `Call Amazon SQS with Step Functions `_ for more details. + """ + + def __init__(self, state_id, wait_for_callback=False, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + wait_for_callback(bool, optional): Boolean value set to `True` if the Task state should wait for callback to resume the operation. (default: False) + timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60) + heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. + comment (str, optional): Human-readable comment or description. (default: None) + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + """ + if wait_for_callback: + kwargs[Field.Resource.value] = 'arn:aws:states:::sqs:sendMessage.waitForTaskToken' + else: + kwargs[Field.Resource.value] = 'arn:aws:states:::sqs:sendMessage' + + super(SqsSendMessage, self).__init__(state_id, **kwargs) diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py new file mode 100644 index 0000000..2dd3b6f --- /dev/null +++ b/tests/unit/test_service_steps.py @@ -0,0 +1,202 @@ +# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing +# permissions and limitations under the License. +from __future__ import absolute_import + +import pytest + +from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep +from stepfunctions.steps.service import SnsPublishStep, SqsSendMessage + + +def test_sns_publish_step_creation(): + step = SnsPublishStep('Publish to SNS', parameters={ + 'TopicArn': 'arn:aws:sns:us-east-1:123456789012:myTopic', + 'Message': 'message', + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::sns:publish', + 'Parameters': { + 'TopicArn': 'arn:aws:sns:us-east-1:123456789012:myTopic', + 'Message': 'message', + }, + 'End': True + } + + step = SnsPublishStep('Publish to SNS', wait_for_callback=True, parameters={ + 'TopicArn': 'arn:aws:sns:us-east-1:123456789012:myTopic', + 'Message': { + 'Input.$': '$', + 'TaskToken.$': '$$.Task.Token' + } + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::sns:publish.waitForTaskToken', + 'Parameters': { + 'TopicArn': 'arn:aws:sns:us-east-1:123456789012:myTopic', + 'Message': { + 'Input.$': '$', + 'TaskToken.$': '$$.Task.Token' + } + }, + 'End': True + } + + +def test_sqs_send_message_step_creation(): + step = SqsSendMessage('Send to SQS', parameters={ + 'QueueUrl': 'https://sqs.us-east-1.amazonaws.com/123456789012/myQueue', + 'MessageBody': 'Hello' + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::sqs:sendMessage', + 'Parameters': { + 'QueueUrl': 'https://sqs.us-east-1.amazonaws.com/123456789012/myQueue', + 'MessageBody': 'Hello' + }, + 'End': True + } + + step = SqsSendMessage('Send to SQS', wait_for_callback=True, parameters={ + 'QueueUrl': 'https://sqs.us-east-1.amazonaws.com/123456789012/myQueue', + 'MessageBody': { + 'Input.$': '$', + 'TaskToken.$': '$$.Task.Token' + } + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::sqs:sendMessage.waitForTaskToken', + 'Parameters': { + 'QueueUrl': 'https://sqs.us-east-1.amazonaws.com/123456789012/myQueue', + 'MessageBody': { + 'Input.$': '$', + 'TaskToken.$': '$$.Task.Token' + } + }, + 'End': True + } + + +def test_dynamodb_get_item_step_creation(): + step = DynamoDBGetItemStep('Read Message From DynamoDB', parameters={ + 'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT', + 'Key': { + 'MessageId': { + 'S.$': '$.List[0]' + } + } + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::dynamodb:getItem', + 'Parameters': { + 'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT', + 'Key': { + 'MessageId': { + 'S.$': '$.List[0]' + } + } + }, + 'End': True + } + + +def test_dynamodb_put_item_step_creation(): + step = DynamoDBPutItemStep('Add Message From DynamoDB', parameters={ + 'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT', + 'Item': { + 'MessageId': { + 'S': '123456789' + } + } + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::dynamodb:putItem', + 'Parameters': { + 'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT', + 'Item': { + 'MessageId': { + 'S': '123456789' + } + } + }, + 'End': True + } + + +def test_dynamodb_delete_item_step_creation(): + step = DynamoDBDeleteItemStep('Delete Message From DynamoDB', parameters={ + 'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT', + 'Key': { + 'MessageId': { + 'S': 'MyMessage' + } + } + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::dynamodb:deleteItem', + 'Parameters': { + 'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT', + 'Key': { + 'MessageId': { + 'S': 'MyMessage' + } + } + }, + 'End': True + } + + +def test_dynamodb_update_item_step_creation(): + step = DynamoDBUpdateItemStep('Update Message From DynamoDB', parameters={ + 'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT', + 'Key': { + 'RecordId': { + 'S': 'RecordId' + } + }, + 'UpdateExpression': 'set Revision = :val1', + 'ExpressionAttributeValues': { + ':val1': { 'S': '2' } + } + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::dynamodb:updateItem', + 'Parameters': { + 'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT', + 'Key': { + 'RecordId': { + 'S': 'RecordId' + } + }, + 'UpdateExpression': 'set Revision = :val1', + 'ExpressionAttributeValues': { + ':val1': { 'S': '2' } + } + }, + 'End': True + } From 2485dd2914c38373d694673d799ba6c65cbfbe1f Mon Sep 17 00:00:00 2001 From: Shunjia Ding Date: Tue, 17 Dec 2019 14:47:41 -0800 Subject: [PATCH 2/2] Rename SqsSendMessage to SqsSendMessageStep --- src/stepfunctions/steps/__init__.py | 2 +- src/stepfunctions/steps/service.py | 4 ++-- tests/unit/test_service_steps.py | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/stepfunctions/steps/__init__.py b/src/stepfunctions/steps/__init__.py index b27df7b..2b6784a 100644 --- a/src/stepfunctions/steps/__init__.py +++ b/src/stepfunctions/steps/__init__.py @@ -19,4 +19,4 @@ from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointConfigStep, EndpointStep from stepfunctions.steps.compute import LambdaStep, BatchSubmitJobStep, GlueStartJobRunStep, EcsRunTaskStep from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep -from stepfunctions.steps.service import SnsPublishStep, SqsSendMessage +from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 4995399..a4195e6 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -122,7 +122,7 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs): super(SnsPublishStep, self).__init__(state_id, **kwargs) -class SqsSendMessage(Task): +class SqsSendMessageStep(Task): """ Creates a Task state to send a message to SQS queue. See `Call Amazon SQS with Step Functions `_ for more details. @@ -146,4 +146,4 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs): else: kwargs[Field.Resource.value] = 'arn:aws:states:::sqs:sendMessage' - super(SqsSendMessage, self).__init__(state_id, **kwargs) + super(SqsSendMessageStep, self).__init__(state_id, **kwargs) diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index 2dd3b6f..e5619b4 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -15,7 +15,7 @@ import pytest from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep -from stepfunctions.steps.service import SnsPublishStep, SqsSendMessage +from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep def test_sns_publish_step_creation(): @@ -57,7 +57,7 @@ def test_sns_publish_step_creation(): def test_sqs_send_message_step_creation(): - step = SqsSendMessage('Send to SQS', parameters={ + step = SqsSendMessageStep('Send to SQS', parameters={ 'QueueUrl': 'https://sqs.us-east-1.amazonaws.com/123456789012/myQueue', 'MessageBody': 'Hello' }) @@ -72,7 +72,7 @@ def test_sqs_send_message_step_creation(): 'End': True } - step = SqsSendMessage('Send to SQS', wait_for_callback=True, parameters={ + step = SqsSendMessageStep('Send to SQS', wait_for_callback=True, parameters={ 'QueueUrl': 'https://sqs.us-east-1.amazonaws.com/123456789012/myQueue', 'MessageBody': { 'Input.$': '$',