diff --git a/src/stepfunctions/steps/__init__.py b/src/stepfunctions/steps/__init__.py index 2b6784a..42c763c 100644 --- a/src/stepfunctions/steps/__init__.py +++ b/src/stepfunctions/steps/__init__.py @@ -20,3 +20,5 @@ from stepfunctions.steps.compute import LambdaStep, BatchSubmitJobStep, GlueStartJobRunStep, EcsRunTaskStep from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep +from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep + diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index a4195e6..5e74161 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -147,3 +147,156 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs): kwargs[Field.Resource.value] = 'arn:aws:states:::sqs:sendMessage' super(SqsSendMessageStep, self).__init__(state_id, **kwargs) + + +class EmrCreateClusterStep(Task): + """ + Creates a Task state to create and start running a cluster (job flow). See `Call Amazon EMR with Step Functions `_ for more details. + """ + + def __init__(self, state_id, wait_for_completion=True, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) + """ + if wait_for_completion: + kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:createCluster.sync' + else: + kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:createCluster' + + super(EmrCreateClusterStep, self).__init__(state_id, **kwargs) + + +class EmrTerminateClusterStep(Task): + """ + Creates a Task state to shut down a cluster (job flow). See `Call Amazon EMR with Step Functions `_ for more details. + """ + + def __init__(self, state_id, wait_for_completion=True, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) + """ + if wait_for_completion: + kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:terminateCluster.sync' + else: + kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:terminateCluster' + + super(EmrTerminateClusterStep, self).__init__(state_id, **kwargs) + + +class EmrAddStepStep(Task): + """ + Creates a Task state to add a new step to a running cluster. See `Call Amazon EMR with Step Functions `_ for more details. + """ + + def __init__(self, state_id, wait_for_completion=True, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) + """ + if wait_for_completion: + kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:addStep.sync' + else: + kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:addStep' + + super(EmrAddStepStep, self).__init__(state_id, **kwargs) + + +class EmrCancelStepStep(Task): + """ + Creates a Task state to cancel a pending step in a running cluster. See `Call Amazon EMR with Step Functions `_ for more details. + """ + + def __init__(self, state_id, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + """ + kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:cancelStep' + + super(EmrCancelStepStep, self).__init__(state_id, **kwargs) + + +class EmrSetClusterTerminationProtectionStep(Task): + """ + Creates a Task state to lock a cluster (job flow) so the EC2 instances in the cluster cannot be terminated by user intervention, an API call, or a job-flow error. See `Call Amazon EMR with Step Functions `_ for more details. + """ + + def __init__(self, state_id, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + """ + kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:setClusterTerminationProtection' + + super(EmrSetClusterTerminationProtectionStep, self).__init__(state_id, **kwargs) + + +class EmrModifyInstanceFleetByNameStep(Task): + """ + Creates a Task state to modify the target On-Demand and target Spot capacities for an instance fleet. See `Call Amazon EMR with Step Functions `_ for more details. + """ + + def __init__(self, state_id, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + """ + kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName' + + super(EmrModifyInstanceFleetByNameStep, self).__init__(state_id, **kwargs) + + +class EmrModifyInstanceGroupByNameStep(Task): + """ + Creates a Task state to modify the number of nodes and configuration settings of an instance group. See `Call Amazon EMR with Step Functions `_ for more details. + """ + + def __init__(self, state_id, **kwargs): + """ + Args: + state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + comment (str, optional): Human-readable comment or description. (default: None) + input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') + parameters (dict, optional): The value of this field becomes the effective input for the state. + result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') + output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') + """ + kwargs[Field.Resource.value] = 'arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName' + + super(EmrModifyInstanceGroupByNameStep, self).__init__(state_id, **kwargs) + diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index e5619b4..64809e9 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -16,6 +16,7 @@ from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep +from stepfunctions.steps.service import EmrCreateClusterStep, EmrTerminateClusterStep, EmrAddStepStep, EmrCancelStepStep, EmrSetClusterTerminationProtectionStep, EmrModifyInstanceFleetByNameStep, EmrModifyInstanceGroupByNameStep def test_sns_publish_step_creation(): @@ -200,3 +201,378 @@ def test_dynamodb_update_item_step_creation(): }, 'End': True } + + +def test_emr_create_cluster_step_creation(): + step = EmrCreateClusterStep('Create EMR cluster', parameters={ + 'Name': 'MyWorkflowCluster', + 'VisibleToAllUsers': True, + 'ReleaseLabel': 'emr-5.28.0', + 'Applications': [ + { + 'Name': 'Hive' + } + ], + 'ServiceRole': 'EMR_DefaultRole', + 'JobFlowRole': 'EMR_EC2_DefaultRole', + 'LogUri': 's3n://aws-logs-123456789012-us-east-1/elasticmapreduce/', + 'Instances': { + 'KeepJobFlowAliveWhenNoSteps': True, + 'InstanceFleets': [ + { + 'InstanceFleetType': 'MASTER', + 'Name': 'MASTER', + 'TargetOnDemandCapacity': 1, + 'InstanceTypeConfigs': [ + { + 'InstanceType': 'm4.xlarge' + } + ] + }, + { + 'InstanceFleetType': 'CORE', + 'Name': 'CORE', + 'TargetOnDemandCapacity': 1, + 'InstanceTypeConfigs': [ + { + 'InstanceType': 'm4.xlarge' + } + ] + } + ] + } + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::elasticmapreduce:createCluster.sync', + 'Parameters': { + 'Name': 'MyWorkflowCluster', + 'VisibleToAllUsers': True, + 'ReleaseLabel': 'emr-5.28.0', + 'Applications': [ + { + 'Name': 'Hive' + } + ], + 'ServiceRole': 'EMR_DefaultRole', + 'JobFlowRole': 'EMR_EC2_DefaultRole', + 'LogUri': 's3n://aws-logs-123456789012-us-east-1/elasticmapreduce/', + 'Instances': { + 'KeepJobFlowAliveWhenNoSteps': True, + 'InstanceFleets': [ + { + 'InstanceFleetType': 'MASTER', + 'Name': 'MASTER', + 'TargetOnDemandCapacity': 1, + 'InstanceTypeConfigs': [ + { + 'InstanceType': 'm4.xlarge' + } + ] + }, + { + 'InstanceFleetType': 'CORE', + 'Name': 'CORE', + 'TargetOnDemandCapacity': 1, + 'InstanceTypeConfigs': [ + { + 'InstanceType': 'm4.xlarge' + } + ] + } + ] + } + }, + 'End': True + } + + step = EmrCreateClusterStep('Create EMR cluster', wait_for_completion=False, parameters={ + 'Name': 'MyWorkflowCluster', + 'VisibleToAllUsers': True, + 'ReleaseLabel': 'emr-5.28.0', + 'Applications': [ + { + 'Name': 'Hive' + } + ], + 'ServiceRole': 'EMR_DefaultRole', + 'JobFlowRole': 'EMR_EC2_DefaultRole', + 'LogUri': 's3n://aws-logs-123456789012-us-east-1/elasticmapreduce/', + 'Instances': { + 'KeepJobFlowAliveWhenNoSteps': True, + 'InstanceFleets': [ + { + 'InstanceFleetType': 'MASTER', + 'Name': 'MASTER', + 'TargetOnDemandCapacity': 1, + 'InstanceTypeConfigs': [ + { + 'InstanceType': 'm4.xlarge' + } + ] + }, + { + 'InstanceFleetType': 'CORE', + 'Name': 'CORE', + 'TargetOnDemandCapacity': 1, + 'InstanceTypeConfigs': [ + { + 'InstanceType': 'm4.xlarge' + } + ] + } + ] + } + }) + + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::elasticmapreduce:createCluster', + 'Parameters': { + 'Name': 'MyWorkflowCluster', + 'VisibleToAllUsers': True, + 'ReleaseLabel': 'emr-5.28.0', + 'Applications': [ + { + 'Name': 'Hive' + } + ], + 'ServiceRole': 'EMR_DefaultRole', + 'JobFlowRole': 'EMR_EC2_DefaultRole', + 'LogUri': 's3n://aws-logs-123456789012-us-east-1/elasticmapreduce/', + 'Instances': { + 'KeepJobFlowAliveWhenNoSteps': True, + 'InstanceFleets': [ + { + 'InstanceFleetType': 'MASTER', + 'Name': 'MASTER', + 'TargetOnDemandCapacity': 1, + 'InstanceTypeConfigs': [ + { + 'InstanceType': 'm4.xlarge' + } + ] + }, + { + 'InstanceFleetType': 'CORE', + 'Name': 'CORE', + 'TargetOnDemandCapacity': 1, + 'InstanceTypeConfigs': [ + { + 'InstanceType': 'm4.xlarge' + } + ] + } + ] + } + }, + 'End': True + } + + +def test_emr_terminate_cluster_step_creation(): + step = EmrTerminateClusterStep('Terminate EMR cluster', parameters={ + 'ClusterId': 'MyWorkflowClusterId' + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::elasticmapreduce:terminateCluster.sync', + 'Parameters': { + 'ClusterId': 'MyWorkflowClusterId', + }, + 'End': True + } + + step = EmrTerminateClusterStep('Terminate EMR cluster', wait_for_completion=False, parameters={ + 'ClusterId': 'MyWorkflowClusterId' + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::elasticmapreduce:terminateCluster', + 'Parameters': { + 'ClusterId': 'MyWorkflowClusterId', + }, + 'End': True + } + +def test_emr_add_step_step_creation(): + step = EmrAddStepStep('Add step to EMR cluster', parameters={ + 'ClusterId': 'MyWorkflowClusterId', + 'Step': { + 'Name': 'The first step', + 'ActionOnFailure': 'CONTINUE', + 'HadoopJarStep': { + 'Jar': 'command-runner.jar', + 'Args': [ + 'hive-script', + '--run-hive-script', + '--args', + '-f', + 's3://.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q', + '-d', + 'INPUT=s3://.elasticmapreduce.samples', + '-d', + 'OUTPUT=s3:///MyHiveQueryResults/' + ] + } + } + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::elasticmapreduce:addStep.sync', + 'Parameters': { + 'ClusterId': 'MyWorkflowClusterId', + 'Step': { + 'Name': 'The first step', + 'ActionOnFailure': 'CONTINUE', + 'HadoopJarStep': { + 'Jar': 'command-runner.jar', + 'Args': [ + 'hive-script', + '--run-hive-script', + '--args', + '-f', + 's3://.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q', + '-d', + 'INPUT=s3://.elasticmapreduce.samples', + '-d', + 'OUTPUT=s3:///MyHiveQueryResults/' + ] + } + } + }, + 'End': True + } + + step = EmrAddStepStep('Add step to EMR cluster', wait_for_completion=False, parameters={ + 'ClusterId': 'MyWorkflowClusterId', + 'Step': { + 'Name': 'The first step', + 'ActionOnFailure': 'CONTINUE', + 'HadoopJarStep': { + 'Jar': 'command-runner.jar', + 'Args': [ + 'hive-script', + '--run-hive-script', + '--args', + '-f', + 's3://.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q', + '-d', + 'INPUT=s3://.elasticmapreduce.samples', + '-d', + 'OUTPUT=s3:///MyHiveQueryResults/' + ] + } + } + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::elasticmapreduce:addStep', + 'Parameters': { + 'ClusterId': 'MyWorkflowClusterId', + 'Step': { + 'Name': 'The first step', + 'ActionOnFailure': 'CONTINUE', + 'HadoopJarStep': { + 'Jar': 'command-runner.jar', + 'Args': [ + 'hive-script', + '--run-hive-script', + '--args', + '-f', + 's3://.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q', + '-d', + 'INPUT=s3://.elasticmapreduce.samples', + '-d', + 'OUTPUT=s3:///MyHiveQueryResults/' + ] + } + } + }, + 'End': True + } + +def test_emr_cancel_step_step_creation(): + step = EmrCancelStepStep('Cancel step from EMR cluster', parameters={ + 'ClusterId': 'MyWorkflowClusterId', + 'StepId': 'MyWorkflowStepId' + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::elasticmapreduce:cancelStep', + 'Parameters': { + 'ClusterId': 'MyWorkflowClusterId', + 'StepId': 'MyWorkflowStepId' + }, + 'End': True + } + +def test_emr_set_cluster_termination_protection_step_creation(): + step = EmrSetClusterTerminationProtectionStep('Set termination protection for EMR cluster', parameters={ + 'ClusterId': 'MyWorkflowClusterId', + 'TerminationProtected': True + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::elasticmapreduce:setClusterTerminationProtection', + 'Parameters': { + 'ClusterId': 'MyWorkflowClusterId', + 'TerminationProtected': True + }, + 'End': True + } + +def test_emr_modify_instance_fleet_by_name_step_creation(): + step = EmrModifyInstanceFleetByNameStep('Modify Instance Fleet by name for EMR cluster', parameters={ + 'ClusterId': 'MyWorkflowClusterId', + 'InstanceFleetName': 'MyCoreFleet', + 'InstanceFleet': { + 'TargetOnDemandCapacity': 8, + 'TargetSpotCapacity': 0 + } + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName', + 'Parameters': { + 'ClusterId': 'MyWorkflowClusterId', + 'InstanceFleetName': 'MyCoreFleet', + 'InstanceFleet': { + 'TargetOnDemandCapacity': 8, + 'TargetSpotCapacity': 0 + } + }, + 'End': True + } + +def test_emr_modify_instance_group_by_name_step_creation(): + step = EmrModifyInstanceGroupByNameStep('Modify Instance Group by name for EMR cluster', parameters={ + 'ClusterId': 'MyWorkflowClusterId', + 'InstanceGroupName': 'MyCoreGroup', + 'InstanceGroup': { + 'InstanceCount': 8 + } + }) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName', + 'Parameters': { + 'ClusterId': 'MyWorkflowClusterId', + 'InstanceGroupName': 'MyCoreGroup', + 'InstanceGroup': { + 'InstanceCount': 8 + } + }, + 'End': True + } +