Run safe processing jobs utilizing PySpark in Amazon SageMaker Pipelines


Amazon SageMaker Studio will help you construct, prepare, debug, deploy, and monitor your fashions and handle your machine studying (ML) workflows. Amazon SageMaker Pipelines lets you construct a secure, scalable, and flexible MLOps platform inside Studio.

On this submit, we clarify the best way to run PySpark processing jobs inside a pipeline. This allows anybody that desires to coach a mannequin utilizing Pipelines to additionally preprocess coaching knowledge, postprocess inference knowledge, or consider fashions utilizing PySpark. This functionality is very related when you might want to course of large-scale knowledge. As well as, we showcase the best way to optimize your PySpark steps utilizing configurations and Spark UI logs.

Pipelines is an Amazon SageMaker instrument for constructing and managing end-to-end ML pipelines. It’s a completely managed on-demand service, built-in with SageMaker and different AWS companies, and due to this fact creates and manages assets for you. This ensures that situations are solely provisioned and used when working the pipelines. Moreover, Pipelines is supported by the SageMaker Python SDK, letting you observe your data lineage and reuse steps by caching them to ease improvement time and value. A SageMaker pipeline can use processing steps to course of knowledge or carry out mannequin analysis.

When processing large-scale knowledge, knowledge scientists and ML engineers usually use PySpark, an interface for Apache Spark in Python. SageMaker supplies prebuilt Docker photographs that embody PySpark and different dependencies wanted to run distributed knowledge processing jobs, together with knowledge transformations and have engineering utilizing the Spark framework. Though these photographs assist you to shortly begin utilizing PySpark in processing jobs, large-scale knowledge processing usually requires particular Spark configurations as a way to optimize the distributed computing of the cluster created by SageMaker.

In our instance, we create a SageMaker pipeline working a single processing step. For extra details about what different steps you’ll be able to add to a pipeline, confer with Pipeline Steps.

SageMaker Processing library

SageMaker Processing can run with particular frameworks (for instance, SKlearnProcessor, PySparkProcessor, or Hugging Face). Impartial of the framework used, every ProcessingStep requires the next:

  • Step title – The title for use on your SageMaker pipeline step
  • Step arguments – The arguments on your ProcessingStep

Moreover, you’ll be able to present the next:

  • The configuration on your step cache as a way to keep away from pointless runs of your step in a SageMaker pipeline
  • A listing of step names, step situations, or step assortment situations that the ProcessingStep is determined by
  • The show title of the ProcessingStep
  • An outline of the ProcessingStep
  • Property information
  • Retry insurance policies

The arguments are handed over to the ProcessingStep. You should use the sagemaker.spark.PySparkProcessor or sagemaker.spark.SparkJarProcessor class to run your Spark utility within a processing job.

Every processor comes with its personal wants, relying on the framework. That is greatest illustrated utilizing the PySparkProcessor, the place you’ll be able to cross further info to optimize the ProcessingStep additional, as an illustration through the configuration parameter when working your job.

Run SageMaker Processing jobs in a safe surroundings

It’s best practice to create a non-public Amazon VPC and configure it in order that your jobs aren’t accessible over the general public web. SageMaker Processing jobs assist you to specify the personal subnets and safety teams in your VPC in addition to allow community isolation and inter-container visitors encryption utilizing the NetworkConfig.VpcConfig request parameter of the CreateProcessingJob API. We offer examples of this configuration utilizing the SageMaker SDK within the subsequent part.

PySpark ProcessingStep inside SageMaker Pipelines

For this instance, we assume that you’ve Studio deployed in a safe surroundings already accessible, together with VPC, VPC endpoints, safety teams, AWS Identity and Access Management (IAM) roles, and AWS Key Management Service (AWS KMS) keys. We additionally assume that you’ve two buckets: one for artifacts like code and logs, and one on your knowledge. The basic_infra.yaml file supplies instance AWS CloudFormation code to provision the mandatory prerequisite infrastructure. The instance code and deployment information can be accessible on GitHub.

For example, we arrange a pipeline containing a single ProcessingStep during which we’re merely studying and writing the abalone dataset utilizing Spark. The code samples present you the best way to arrange and configure the ProcessingStep.

We outline parameters for the pipeline (title, position, buckets, and so forth) and step-specific settings (occasion sort and depend, framework model, and so forth). On this instance, we use a safe setup and likewise outline subnets, safety teams, and the inter-container visitors encryption. For this instance, you want a pipeline execution position with SageMaker full entry and a VPC. See the next code:

{
	"pipeline_name": "ProcessingPipeline",
	"trial": "test-blog-post",
	"pipeline_role": "arn:aws:iam::<ACCOUNT_NUMBER>:position/<PIPELINE_EXECUTION_ROLE_NAME>",
	"network_subnet_ids": [
		"subnet-<SUBNET_ID>",
		"subnet-<SUBNET_ID>"
	],
	"network_security_group_ids": [
		"sg-<SG_ID>"
	],
	"pyspark_process_volume_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>",
	"pyspark_process_output_kms": "arn:aws:kms:<REGION_NAME>:<ACCOUNT_NUMBER>:key/<KMS_KEY_ID>",
	"pyspark_helper_code": "s3://<INFRA_S3_BUCKET>/src/helper/data_utils.py",
	"spark_config_file": "s3://<INFRA_S3_BUCKET>/src/spark_configuration/configuration.json",
	"pyspark_process_code": "s3://<INFRA_S3_BUCKET>/src/processing/process_pyspark.py",
	"process_spark_ui_log_output": "s3://<DATA_S3_BUCKET>/spark_ui_logs/{}",
	"pyspark_framework_version": "2.4",
	"pyspark_process_name": "pyspark-processing",
	"pyspark_process_data_input": "s3a://<DATA_S3_BUCKET>/data_input/abalone_data.csv",
	"pyspark_process_data_output": "s3a://<DATA_S3_BUCKET>/pyspark/data_output",
	"pyspark_process_instance_type": "ml.m5.4xlarge",
	"pyspark_process_instance_count": 6,
	"tags": {
		"Venture": "tag-for-project",
		"Proprietor": "tag-for-owner"
	}
}

To display, the next code instance runs a PySpark script on SageMaker Processing inside a pipeline through the use of the PySparkProcessor:

# import code necessities
# customary libraries import
import logging
import json

# sagemaker mannequin import
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.steps import CacheConfig
from sagemaker.processing import ProcessingInput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor

from helpers.infra.networking.networking import get_network_configuration
from helpers.infra.tags.tags import get_tags_input
from helpers.pipeline_utils import get_pipeline_config

def create_pipeline(pipeline_params, logger):
    """
    Args:
        pipeline_params (ml_pipeline.params.pipeline_params.py.Params): pipeline parameters
        logger (logger): logger
    Returns:
        ()
    """
    # Create SageMaker Session
    sagemaker_session = PipelineSession()

    # Get Tags
    tags_input = get_tags_input(pipeline_params["tags"])

    # get community configuration
    network_config = get_network_configuration(
        subnets=pipeline_params["network_subnet_ids"],
        security_group_ids=pipeline_params["network_security_group_ids"]
    )

    # Get Pipeline Configurations
    pipeline_config = get_pipeline_config(pipeline_params)

    # setting processing cache obj
    logger.data("Setting " + pipeline_params["pyspark_process_name"] + " cache configuration 3 to 30 days")
    cache_config = CacheConfig(enable_caching=True, expire_after="p30d")

    # Create PySpark Processing Step
    logger.data("Creating " + pipeline_params["pyspark_process_name"] + " processor")

    # organising spark processor
    processing_pyspark_processor = PySparkProcessor(
        base_job_name=pipeline_params["pyspark_process_name"],
        framework_version=pipeline_params["pyspark_framework_version"],
        position=pipeline_params["pipeline_role"],
        instance_count=pipeline_params["pyspark_process_instance_count"],
        instance_type=pipeline_params["pyspark_process_instance_type"],
        volume_kms_key=pipeline_params["pyspark_process_volume_kms"],
        output_kms_key=pipeline_params["pyspark_process_output_kms"],
        network_config=network_config,
        tags=tags_input,
        sagemaker_session=sagemaker_session
    )
    
    # organising arguments
    run_ags = processing_pyspark_processor.run(
        submit_app=pipeline_params["pyspark_process_code"],
        submit_py_files=[pipeline_params["pyspark_helper_code"]],
        arguments=[
        # processing input arguments. To add new arguments to this list you need to provide two entrances:
        # 1st is the argument name preceded by "--" and the 2nd is the argument value
        # setting up processing arguments
            "--input_table", pipeline_params["pyspark_process_data_input"],
            "--output_table", pipeline_params["pyspark_process_data_output"]
        ],
        spark_event_logs_s3_uri=pipeline_params["process_spark_ui_log_output"].format(pipeline_params["trial"]),
        inputs = [
            ProcessingInput(
                source=pipeline_params["spark_config_file"],
                vacation spot="/decide/ml/processing/enter/conf",
                s3_data_type="S3Prefix",
                s3_input_mode="File",
                s3_data_distribution_type="FullyReplicated",
                s3_compression_type="None"
            )
        ],
    )

    # create step
    pyspark_processing_step = ProcessingStep(
        title=pipeline_params["pyspark_process_name"],
        step_args=run_ags,
        cache_config=cache_config,
    )

    # Create Pipeline
    pipeline = Pipeline(
        title=pipeline_params["pipeline_name"],
        steps=[
            pyspark_processing_step
        ],
        pipeline_experiment_config=PipelineExperimentConfig(
            pipeline_params["pipeline_name"],
            pipeline_config["trial"]
        ),
        sagemaker_session=sagemaker_session
    )
    pipeline.upsert(
        role_arn=pipeline_params["pipeline_role"],
        description="Instance pipeline",
        tags=tags_input
    )
    return pipeline


def important():
    # arrange logging
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)
    logger.data("Get Pipeline Parameter")

    with open("ml_pipeline/params/pipeline_params.json", "r") as f:
        pipeline_params = json.load(f)
    print(pipeline_params)

    logger.data("Create Pipeline")
    pipeline = create_pipeline(pipeline_params, logger=logger)
    logger.data("Execute Pipeline")
    execution = pipeline.begin()
    return execution


if __name__ == "__main__":
    important()

As proven within the previous code, we’re overwriting the default Spark configurations by offering configuration.json as a ProcessingInput. We use a configuration.json file that was saved in Amazon Simple Storage Service (Amazon S3) with the next settings:

[
    {
        "Classification":"spark-defaults",
        "Properties":{
            "spark.executor.memory":"10g",
            "spark.executor.memoryOverhead":"5g",
            "spark.driver.memory":"10g",
            "spark.driver.memoryOverhead":"10g",
            "spark.driver.maxResultSize":"10g",
            "spark.executor.cores":5,
            "spark.executor.instances":5,
            "spark.yarn.maxAppAttempts":1
            "spark.hadoop.fs.s3a.endpoint":"s3.<region>.amazonaws.com",
            "spark.sql.parquet.fs.optimized.comitter.optimization-enabled":true
        }
    }
]

We are able to replace the default Spark configuration both by passing the file as a ProcessingInput or through the use of the configuration argument when working the run() operate.

The Spark configuration depends on different choices, just like the occasion sort and occasion depend chosen for the processing job. The primary consideration is the variety of situations, the vCPU cores that every of these situations have, and the occasion reminiscence. You should use Spark UIs or CloudWatch instance metrics and logs to calibrate these values over a number of run iterations.

As well as, the executor and driver settings could be optimized even additional. For an instance of the best way to calculate these, confer with Best practices for successfully managing memory for Apache Spark applications on Amazon EMR.

Subsequent, for driver and executor settings, we suggest investigating the committer settings to enhance efficiency when writing to Amazon S3. In our case, we’re writing Parquet information to Amazon S3 and setting “spark.sql.parquet.fs.optimized.comitter.optimization-enabled” to true.

If wanted for a connection to Amazon S3, a regional endpoint “spark.hadoop.fs.s3a.endpoint” could be specified inside the configurations file.

On this instance pipeline, the PySpark script spark_process.py (as proven within the following code) masses a CSV file from Amazon S3 right into a Spark knowledge body, and saves the info as Parquet again to Amazon S3.

Observe that our instance configuration will not be proportionate to the workload as a result of studying and writing the abalone dataset could possibly be completed on default settings on one occasion. The configurations we talked about must be outlined primarily based in your particular wants.

# import necessities
import argparse
import logging
import sys
import os
import pandas as pd

# spark imports
from pyspark.sql import SparkSession
from pyspark.sql.capabilities import (udf, col)
from pyspark.sql.sorts import StringType, StructField, StructType, FloatType

from data_utils import(
    spark_read_parquet,
    Unbuffered
)

sys.stdout = Unbuffered(sys.stdout)

# Outline customized handler
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
logger.addHandler(handler)
logger.setLevel(logging.INFO)

def important(data_path):

    spark = SparkSession.builder.appName("PySparkJob").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    schema = StructType(
        [
            StructField("sex", StringType(), True),
            StructField("length", FloatType(), True),
            StructField("diameter", FloatType(), True),
            StructField("height", FloatType(), True),
            StructField("whole_weight", FloatType(), True),
            StructField("shucked_weight", FloatType(), True),
            StructField("viscera_weight", FloatType(), True),
            StructField("rings", FloatType(), True),
        ]
    )

    df = spark.learn.csv(data_path, header=False, schema=schema)
    return df.choose("intercourse", "size", "diameter", "rings")

if __name__ == "__main__":
    logger.data(f"===============================================================")
    logger.data(f"================= Beginning pyspark-processing =================")
    parser = argparse.ArgumentParser(description="app inputs")
    parser.add_argument("--input_table", sort=str, assist="path to the channel knowledge")
    parser.add_argument("--output_table", sort=str, assist="path to the output knowledge")
    args = parser.parse_args()
    
    df = important(args.input_table)

    logger.data("Writing remodeled knowledge")
    df.write.csv(os.path.be part of(args.output_table, "remodeled.csv"), header=True, mode="overwrite")

    # save knowledge
    df.coalesce(10).write.mode("overwrite").parquet(args.output_table)

    logger.data(f"================== Ending pyspark-processing ==================")
    logger.data(f"===============================================================")

To dive into optimizing Spark processing jobs, you need to use the CloudWatch logs in addition to the Spark UI. You’ll be able to create the Spark UI by working a Processing job on a SageMaker pocket book occasion. You’ll be able to view the Spark UI for the Processing jobs running within a pipeline by running the history server inside a SageMaker pocket book occasion if the Spark UI logs had been saved inside the identical Amazon S3 location.

Clear up

If you happen to adopted the tutorial, it’s good apply to delete assets which are not used to cease incurring costs. Ensure to delete the CloudFormation stack that you simply used to create your assets. It will delete the stack created in addition to the assets it created.

Conclusion

On this submit, we confirmed the best way to run a safe SageMaker Processing job utilizing PySpark inside SageMaker Pipelines. We additionally demonstrated the best way to optimize PySpark utilizing Spark configurations and arrange your Processing job to run in a safe networking configuration.

As a subsequent step, discover the best way to automate the complete mannequin lifecycle and the way customers built secure and scalable MLOps platforms utilizing SageMaker companies.


In regards to the Authors

Maren Suilmann is a Knowledge Scientist at AWS Professional Services. She works with prospects throughout industries unveiling the ability of AI/ML to attain their enterprise outcomes. Maren has been with AWS since November 2019. In her spare time, she enjoys kickboxing, mountain climbing to nice views, and board recreation nights.


Maira Ladeira Tanke
is an ML Specialist at AWS. With a background in knowledge science, she has 9 years of expertise architecting and constructing ML purposes with prospects throughout industries. As a technical lead, she helps prospects speed up their achievement of enterprise worth by way of rising applied sciences and modern options. In her free time, Maira enjoys touring and spending time together with her household someplace heat.


Pauline Ting
is Knowledge Scientist within the AWS Professional Services staff. She helps prospects in reaching and accelerating their enterprise consequence by creating AI/ML options. In her spare time, Pauline enjoys touring, browsing, and attempting new dessert locations.


Donald Fossouo
is a Sr Knowledge Architect within the AWS Professional Services staff, largely working with World Finance Service. He engages with prospects to create modern options that handle buyer enterprise issues and speed up the adoption of AWS companies. In his spare time, Donald enjoys studying, working, and touring.

Leave a Reply

Your email address will not be published. Required fields are marked *