Greatest practices and design patterns for constructing machine studying workflows with Amazon SageMaker Pipelines


Amazon SageMaker Pipelines is a totally managed AWS service for constructing and orchestrating machine studying (ML) workflows. SageMaker Pipelines presents ML software builders the power to orchestrate completely different steps of the ML workflow, together with information loading, information transformation, coaching, tuning, and deployment. You should use SageMaker Pipelines to orchestrate ML jobs in SageMaker, and its integration with the larger AWS ecosystem additionally lets you use assets like AWS Lambda features, Amazon EMR jobs, and extra. This allows you to construct a custom-made and reproducible pipeline for particular necessities in your ML workflows.

On this submit, we offer some finest practices to maximise the worth of SageMaker Pipelines and make the event expertise seamless. We additionally talk about some frequent design eventualities and patterns when constructing SageMaker Pipelines and supply examples for addressing them.

Greatest practices for SageMaker Pipelines

On this part, we talk about some finest practices that may be adopted whereas designing workflows utilizing SageMaker Pipelines. Adopting them can enhance the event course of and streamline the operational administration of SageMaker Pipelines.

Use Pipeline Session for lazy loading of the pipeline

Pipeline Session allows lazy initialization of pipeline assets (the roles usually are not began till pipeline runtime). The PipelineSession context inherits the SageMaker Session and implements handy strategies for interacting with different SageMaker entities and assets, equivalent to coaching jobs, endpoints, enter datasets in Amazon Simple Storage Service (Amazon S3), and so forth. When defining SageMaker Pipelines, you need to use PipelineSession over the common SageMaker Session:

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
function = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
sklearn_processor = SKLearnProcessor(
    framework_version=’0.20.0’,
    instance_type=’ml.m5.xlarge’,
    instance_count=1,
    base_job_name="sklearn-abalone-process",
    function=function,
    sagemaker_session=pipeline_session,
)

Run pipelines in native mode for cost-effective and fast iterations throughout growth

You possibly can run a pipeline in local mode utilizing the LocalPipelineSession context. On this mode, the pipeline and jobs are run regionally utilizing assets on the native machine, as an alternative of SageMaker managed assets. Native mode gives a cheap strategy to iterate on the pipeline code with a smaller subset of knowledge. After the pipeline is examined regionally, it may be scaled to run utilizing the PipelineSession context.

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline_context import LocalPipelineSession
local_pipeline_session = LocalPipelineSession()
function = sagemaker.get_execution_role()
sklearn_processor = SKLearnProcessor(
    framework_version=’0.20.0’,
    instance_type=’ml.m5.xlarge,
    instance_count=1,
    base_job_name="sklearn-abalone-process",
    function=function,
    sagemaker_session=local_pipeline_session,
)

Handle a SageMaker pipeline by way of versioning

Versioning of artifacts and pipeline definitions is a typical requirement within the growth lifecycle. You possibly can create a number of variations of the pipeline by naming pipeline objects with a novel prefix or suffix, the most typical being a timestamp, as proven within the following code:

from sagemaker.workflow.pipeline_context import PipelineSession
import time

current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
pipeline_name = "pipeline_" + current_time
pipeline_session = PipelineSession()
pipeline = Pipeline(
    title=pipeline_name,
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=pipeline_session,
)

Manage and observe SageMaker pipeline runs by integrating with SageMaker Experiments

SageMaker Pipelines will be simply built-in with SageMaker Experiments for organizing and tracking pipeline runs. That is achieved by specifying PipelineExperimentConfig on the time of making a pipeline object. With this configuration object, you possibly can specify an experiment title and a trial title. The run particulars of a SageMaker pipeline get organized underneath the required experiment and trial. When you don’t explicitly specify an experiment title, a pipeline title is used for the experiment title. Equally, if you happen to don’t explicitly specify a trial title, a pipeline run ID is used for the trial or run group title. See the next code:

Pipeline(
    title="MyPipeline",
    parameters=[...],
    pipeline_experiment_config=PipelineExperimentConfig(
        experiment_name = ExecutionVariables.PIPELINE_NAME,
        trial_name = ExecutionVariables.PIPELINE_EXECUTION_ID
        ),
    steps=[...]
)

Securely run SageMaker pipelines inside a personal VPC

To safe the ML workloads, it’s a finest observe to deploy the roles orchestrated by SageMaker Pipelines in a safe community configuration inside a personal VPC, non-public subnets, and safety teams. To make sure and implement the utilization of this safe atmosphere, you possibly can implement the next AWS Identity and Access Management (IAM) coverage for the SageMaker execution role (that is the function assumed by the pipeline throughout its run). It’s also possible to add the coverage to run the roles orchestrated by SageMaker Pipelines in community isolation mode.

# IAM Coverage to implement execution inside a personal VPC

{

    "Motion": [

        "sagemaker:CreateProcessingJob",
        "sagemaker:CreateTrainingJob",
        "sagemaker:CreateModel"
    ],

    "Useful resource": "*",
    "Impact": "Deny",
    "Situation": {
        "Null": {
            "sagemaker:VpcSubnets": "true"
        }
    }
}

# IAM Coverage to implement execution in community isolation mode
{

    "Model": "2012-10-17",
    "Assertion": [
        {
            "Effect": "Deny",
            "Action": [
                "sagemaker:Create*"
            ],
            "Useful resource": "*",
            "Situation": {
                "StringNotEqualsIfExists": {
                    "sagemaker:NetworkIsolation": "true"
                }
            }
        }
    ]
}

For an instance of pipeline implementation with these safety controls in place, seek advice from Orchestrating Jobs, Model Registration, and Continuous Deployment with Amazon SageMaker in a secure environment.

Monitor the price of pipeline runs utilizing tags

Utilizing SageMaker pipelines by itself is free; you pay for the compute and storage assets you spin up as a part of the person pipeline steps like processing, coaching, and batch inference. To mixture the prices per pipeline run, you possibly can embody tags in each pipeline step that creates a useful resource. These tags can then be referenced in the associated fee explorer to filter and mixture whole pipeline run value, as proven within the following instance:

sklearn_processor = SKLearnProcessor(
    framework_version=’0.20.0’,
    instance_type=’ml.m5.xlarge,
    instance_count=1,
    base_job_name="sklearn-abalone-process",
    function=function,
    tags=[{'Key':'pipeline-cost-tag', 'Value':'<<tag_parameter>>'}]
)

step_process = ProcessingStep(
    title="AbaloneProcess",
    processor=sklearn_processor,
    ...
)

From the associated fee explorer, now you can get the associated fee filtered by the tag:

response = shopper.get_cost_and_usage(
    TimePeriod={
        'Begin': '2023-07-01',
        'Finish': '2023-07-15'
        },
    Metrics=['BLENDED_COST','USAGE_QUANTITY','UNBLENDED_COST'],
    Granularity='MONTHLY',
    Filter={
        'Dimensions': {
            'Key':'USAGE_TYPE',
            'Values': [
                ‘SageMaker:Pipeline’
            ]
        },
        'Tags': {
            'Key': 'keyName',
            'Values': [
                'keyValue',
                ]
        }
    }
)

Design patterns for some frequent eventualities

On this part, we talk about design patterns for some frequent use circumstances with SageMaker Pipelines.

Run a light-weight Python perform utilizing a Lambda step

Python features are omnipresent in ML workflows; they’re utilized in preprocessing, postprocessing, analysis, and extra. Lambda is a serverless compute service that permits you to run code with out provisioning or managing servers. With Lambda, you possibly can run code in your most popular language that features Python. You should use this to run customized Python code as a part of your pipeline. A Lambda step allows you to run Lambda features as a part of your SageMaker pipeline. Begin with the next code:

%%writefile lambdafunc.py

import json

def lambda_handler(occasion, context):
    str1 = occasion["str1"]
    str2 = occasion["str2"]
    str3 = str1 + str2
    return {
        "str3": str3
    }

Create the Lambda perform utilizing the SageMaker Python SDK’s Lambda helper:

from sagemaker.lambda_helper import Lambda

def create_lambda(function_name, script, handler):
    response = Lambda(
        function_name=function_name,
        execution_role_arn=function,
        script= script,
        handler=handler,
        timeout=600,
        memory_size=10240,
    ).upsert()

    function_arn = response['FunctionArn']
    return function_arn

fn_arn = create_Lambda("func", "lambdafunc.py", handler = "lambdafunc.lambda_handler")

Name the Lambda step:

from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum
)

str3 = LambdaOutput(output_name="str3", output_type=LambdaOutputTypeEnum.String)

# Lambda Step
step_lambda1 = LambdaStep(
    title="LambdaStep1",
    lambda_func=Lambda(
        function_arn=fn_arn
    ),
    inputs={
        "str1": "Hey",
        "str2": " World"
    },
    outputs=[str3],
)

Cross information between steps

Enter information for a pipeline step is both an accessible information location or information generated by one of many earlier steps within the pipeline. You possibly can present this data as a ProcessingInput parameter. Let’s have a look at a couple of eventualities of how you should utilize ProcessingInput.

State of affairs 1: Cross the output (primitive information varieties) of a Lambda step to a processing step

Primitive information varieties seek advice from scalar information varieties like string, integer, Boolean, and float.

The next code snippet defines a Lambda perform that returns a dictionary of variables with primitive information varieties. Your Lambda perform code will return a JSON of key-value pairs when invoked from the Lambda step throughout the SageMaker pipeline.

def handler(occasion, context):
    ...
    return {
        "output1": "string_value",
        "output2": 1,
        "output3": True,
        "output4": 2.0,
    }

Within the pipeline definition, you possibly can then outline SageMaker pipeline parameters which are of a selected information sort and set the variable to the output of the Lambda perform:

from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum
)
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor

function = sagemaker.get_execution_role()
pipeline_session = PipelineSession()

# 1. Outline the output params of the Lambda Step

str_outputParam = LambdaOutput(output_name="output1", output_type=LambdaOutputTypeEnum.String)
int_outputParam = LambdaOutput(output_name"output2", output_type=LambdaOutputTypeEnum.Integer)
bool_outputParam = LambdaOutput(output_name"output3", output_type=LambdaOutputTypeEnum.Boolean)
float_outputParam = LambdaOutput(output_name"output4", output_type=LambdaOutputTypeEnum.Float)

# 2. Lambda step invoking the lambda perform and returns the Output

step_lambda = LambdaStep(
    title="MyLambdaStep",
    lambda_func=Lambda(
        function_arn="arn:aws:lambda:us-west-2:123456789012:perform:sagemaker_test_lambda",
        session=PipelineSession(),
        ),
    inputs={"arg1": "foo", "arg2": "foo1"},
    outputs=[
        str_outputParam, int_outputParam, bool_outputParam, float_outputParam
        ],
)

# 3. Extract the output of the Lambda

str_outputParam = step_lambda.properties.Outputs["output1"]

# 4. Use it in a subsequent step. For ex. Processing step

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=1,
    sagemaker_session=pipeline_session,
    function=function
)

processor_args = sklearn_processor.run(
    code="code/preprocess.py", #python script to run
    arguments=["--input-args", str_outputParam]
)

step_process = ProcessingStep(
    title="processstep1",
    step_args=processor_args,
)

State of affairs 2: Cross the output (non-primitive information varieties) of a Lambda step to a processing step

Non-primitive information varieties seek advice from non-scalar information varieties (for instance, NamedTuple). You could have a state of affairs when it’s important to return a non-primitive information sort from a Lambda perform. To do that, it’s important to convert your non-primitive information sort to a string:

# Lambda perform code returning a non primitive information sort

from collections import namedtuple

def lambda_handler(occasion, context):
    Outputs = namedtuple("Outputs", "sample_output")
    named_tuple = Outputs(
                    [
                        {'output1': 1, 'output2': 2},
                        {'output3': 'foo', 'output4': 'foo1'}
                    ]
                )
return{
    "named_tuple_string": str(named_tuple)
}

#Pipeline step that makes use of the Lambda output as a “Parameter Enter”

output_ref = step_lambda.properties.Outputs["named_tuple_string"]

Then you should utilize this string as an enter to a subsequent step within the pipeline. To make use of the named tuple within the code, use eval() to parse the Python expression within the string:

# Decipher the string in your processing logic code

import argparse
from collections import namedtuple

Outputs = namedtuple("Outputs", "sample_output")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--named_tuple_string", sort=str, required=True)
    args = parser.parse_args()
    #use eval to acquire the named tuple from the string
    named_tuple = eval(args.named_tuple_string)

State of affairs 3: Cross the output of a step by way of a property file

It’s also possible to retailer the output of a processing step in a property JSON file for downstream consumption in a ConditionStep or one other ProcessingStep. You should use the JSONGet function to question a property file. See the next code:

# 1. Outline a Processor with a ProcessingOutput
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="sklearn-abalone-preprocess",
    sagemaker_session=session,
    function=sagemaker.get_execution_role(),
)

step_args = sklearn_processor.run(

                outputs=[
                    ProcessingOutput(
                        output_name="hyperparam",
                        source="/opt/ml/processing/evaluation"
                    ),
                ],
            code="./native/preprocess.py",
            arguments=["--input-data", "s3://my-input"],
)

# 2. Outline a PropertyFile the place the output_name matches that with the one used within the Processor

hyperparam_report = PropertyFile(
    title="AbaloneHyperparamReport",
    output_name="hyperparam",
    path="hyperparam.json",
)

Let’s assume the property file’s contents have been the next:

{
    "hyperparam": {
        "eta": {
            "worth": 0.6
        }
    }
}

On this case, it may be queried for a selected worth and utilized in subsequent steps utilizing the JsonGet perform:

# 3. Question the property file
eta = JsonGet(
    step_name=step_process.title,
    property_file=hyperparam_report,
    json_path="hyperparam.eta.worth",
)

Parameterize a variable in pipeline definition

Parameterizing variables in order that they can be utilized at runtime is usually fascinating—for instance, to assemble an S3 URI. You possibly can parameterize a string such that it’s evaluated at runtime utilizing the Join perform. The next code snippet exhibits the way to outline the variable utilizing the Be part of perform and use that to set the output location in a processing step:

# outline the variable to retailer the s3 URI
s3_location = Be part of(
    on="/", 
    values=[
        "s3:/",
        ParameterString(
            name="MyBucket", 
            default_value=""
        ),
        "training",
        ExecutionVariables.PIPELINE_EXECUTION_ID
    ]
)

# outline the processing step
sklearn_processor = SKLearnProcessor(
    framework_version="1.2-1",
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess",
    sagemaker_session=pipeline_session,
    function=function,
)

# use the s3uri because the output location in processing step
processor_run_args = sklearn_processor.run(
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/train",
            destination=s3_location,
        ),
    ],
    code="code/preprocess.py"
)

step_process = ProcessingStep(
    title="PreprocessingJob”,
    step_args=processor_run_args,
)

Run parallel code over an iterable

Some ML workflows run code in parallel for-loops over a static set of things (an iterable). It might both be the identical code that will get run on completely different information or a special piece of code that must be run for every merchandise. For instance, when you’ve got a really massive variety of rows in a file and wish to pace up the processing time, you possibly can depend on the previous sample. If you wish to carry out completely different transformations on particular sub-groups within the information, you might need to run a special piece of code for each sub-group within the information. The next two eventualities illustrate how one can design SageMaker pipelines for this goal.

State of affairs 1: Implement a processing logic on completely different parts of knowledge

You possibly can run a processing job with a number of situations (by setting instance_count to a worth larger than 1). This distributes the enter information from Amazon S3 into all of the processing situations. You possibly can then use a script (course of.py) to work on a selected portion of the information primarily based on the occasion quantity and the corresponding ingredient within the listing of things. The programming logic in course of.py will be written such {that a} completely different module or piece of code will get run relying on the listing of things that it processes. The next instance defines a processor that can be utilized in a ProcessingStep:

sklearn_processor = FrameworkProcessor(
    estimator_cls=sagemaker.sklearn.estimator.SKLearn,
    framework_version="0.23-1",
    instance_type="ml.m5.4xlarge",
    instance_count=4, #variety of parallel executions / situations
    base_job_name="parallel-step",
    sagemaker_session=session,
    function=function,
)

step_args = sklearn_processor.run(
    code="course of.py",
    arguments=[
        "--items", 
        list_of_items, #data structure containing a list of items
        inputs=[
            ProcessingInput(source="s3://sagemaker-us-east-1-xxxxxxxxxxxx/abalone/abalone-dataset.csv",
                    destination="/opt/ml/processing/input"
            )
        ],
    ]
)

State of affairs 2: Run a sequence of steps

When you have got a sequence of steps that must be run in parallel, you possibly can outline every sequence as an unbiased SageMaker pipeline. The run of those SageMaker pipelines can then be triggered from a Lambda perform that’s a part of a LambdaStep within the mum or dad pipeline. The next piece of code illustrates the state of affairs the place two completely different SageMaker pipeline runs are triggered:

import boto3
def lambda_handler(occasion, context):
    objects = [1, 2]
    #sagemaker shopper
    sm_client = boto3.shopper("sagemaker")
    
    #title of the pipeline that must be triggered.
    #if there are a number of, you possibly can fetch obtainable pipelines utilizing boto3 api
    #and set off the suitable one primarily based in your logic.
    pipeline_name="child-pipeline-1"

    #set off pipeline for each merchandise
    response_ppl = sm_client.start_pipeline_execution(
                        PipelineName=pipeline_name,
                        PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s),
                    )
    pipeline_name="child-pipeline-2"
    response_ppl = sm_client.start_pipeline_execution(
                        PipelineName=pipeline_name,
                        PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s),
                    )
return

Conclusion

On this submit, we mentioned some finest practices for the environment friendly use and upkeep of SageMaker pipelines. We additionally supplied sure patterns that you could undertake whereas designing workflows with SageMaker Pipelines, whether or not you might be authoring new pipelines or are migrating ML workflows from different orchestration instruments. To get began with SageMaker Pipelines for ML workflow orchestration, seek advice from the code samples on GitHub and Amazon SageMaker Model Building Pipelines.


In regards to the Authors

Pinak Panigrahi works with prospects to construct machine studying pushed options to unravel strategic enterprise issues on AWS. When not occupied with machine studying, he will be discovered taking a hike, studying a guide or watching sports activities.

Meenakshisundaram Thandavarayan works for AWS as an AI/ ML Specialist. He has a ardour to design, create, and promote human-centered information and analytics experiences. Meena focusses on growing sustainable programs that ship measurable, aggressive benefits for strategic prospects of AWS. Meena is a connector, design thinker, and strives to drive enterprise to new methods of working by way of innovation, incubation and democratization.

Leave a Reply

Your email address will not be published. Required fields are marked *