From cda5c978fa390d7f95acda0da9b670e0a78c04b9 Mon Sep 17 00:00:00 2001 From: knikitiuk <36886708+KristinaNikitiuk@users.noreply.github.com> Date: Wed, 15 Feb 2023 14:44:19 +0300 Subject: [PATCH 1/6] notebook_helper + upsert nb pipeline --- mlops_utilities/actions.py | 42 ++ mlops_utilities/notebook_helper.py | 125 +++++ .../entrypoint.sh | 16 + .../hyperparams.json | 10 + .../processing_local_pipeline.ipynb | 186 +++++++ .../requirements.txt | 3 + .../training_local_pipeline_updated.ipynb | 167 +++++++ .../training_pipeline.defaults.yml | 17 + mlops_utilities/notebook_tests/test.ipynb | 272 ++++++++++ .../notebook_tests/test_training.ipynb | 468 ++++++++++++++++++ 10 files changed, 1306 insertions(+) create mode 100644 mlops_utilities/notebook_helper.py create mode 100644 mlops_utilities/notebook_test_processing_code/entrypoint.sh create mode 100644 mlops_utilities/notebook_test_processing_code/hyperparams.json create mode 100644 mlops_utilities/notebook_test_processing_code/processing_local_pipeline.ipynb create mode 100644 mlops_utilities/notebook_test_processing_code/requirements.txt create mode 100644 mlops_utilities/notebook_test_processing_code/training_local_pipeline_updated.ipynb create mode 100644 mlops_utilities/notebook_test_processing_code/training_pipeline.defaults.yml create mode 100644 mlops_utilities/notebook_tests/test.ipynb create mode 100644 mlops_utilities/notebook_tests/test_training.ipynb diff --git a/mlops_utilities/actions.py b/mlops_utilities/actions.py index 163ccee..68367a0 100644 --- a/mlops_utilities/actions.py +++ b/mlops_utilities/actions.py @@ -12,6 +12,7 @@ from sagemaker.workflow.pipeline_context import PipelineSession from mlops_utilities import helpers +import notebook_helper logger = logging.getLogger(__name__) @@ -118,6 +119,47 @@ def run_pipeline( return sagemaker_client.start_pipeline_execution(**start_pipe_args) +def upsert_notebook_pipeline( + pipeline_name: str, + notebook_path: str, + role: str, + nb_yml_config: str, + pipeline_tags: Optional[Dict[str, str]] = None, + image_uri: Optional[str] = None, + dryrun: bool = False, +): + """ + Local file will be uploaded to S3 using default bucket (configured) + Args: + notebook_path: local path to *.ipynb file + pipeline_name: see existing `upsert_pipeline` method + image_uri: ECR image URI that is built and pushed by the project CI + pipeline_tags: see existing `upsert_pipeline` method + dryrun: see existing `upsert_pipeline` method + """ + + sm_session = Session(default_bucket='kris-mlops-utilities-test') + + pipeline_steps = notebook_helper.compose_pipeline( + sm_session=sm_session, + role=role, + config_yml_path=nb_yml_config, + processing=True, + notebook_path=notebook_path + ) + + pipeline = notebook_helper.create_pipeline( + pipeline_name=pipeline_name, + sm_session=sm_session, + steps=pipeline_steps, + pipeline_params=[] + ) + if not dryrun: + if pipeline_tags is not None: + pipeline_tags = helpers.convert_param_dict_to_key_value_list(pipeline_tags) + pipeline.upsert(role_arn=role, tags=pipeline_tags) + + def deploy_model( sagemaker_session: Session, model_package_group_name: str, diff --git a/mlops_utilities/notebook_helper.py b/mlops_utilities/notebook_helper.py new file mode 100644 index 0000000..1344101 --- /dev/null +++ b/mlops_utilities/notebook_helper.py @@ -0,0 +1,125 @@ +import json +import os + +from omegaconf import OmegaConf +from sagemaker import Session, TrainingInput +from sagemaker.estimator import Estimator +from sagemaker.processing import FrameworkProcessor, ProcessingInput, ProcessingOutput +from sagemaker.sklearn import SKLearn +from sagemaker.workflow.pipeline import Pipeline +from sagemaker.workflow.steps import ProcessingStep, TrainingStep + +PROCESSING_CONTAINER_DIR = "/opt/ml/processing" + + +def load_nb_config(nb_config_path: str): + return OmegaConf.load(nb_config_path) + + +def create_processor(sm_session: Session, role: str, nb_config_path: str) -> FrameworkProcessor: + nb_config = load_nb_config(nb_config_path) + return FrameworkProcessor( + estimator_cls=SKLearn, + framework_version="0.23-1", + role=role, + instance_count=nb_config.training.instance_count, + instance_type=nb_config.training.instance_type, + sagemaker_session=sm_session, + ) + + +def create_processing_step(processing_step_name: str, sm_session: Session, notebook_path: str, + role: str, nb_config_path: str) -> ProcessingStep: + return ProcessingStep( + processing_step_name, + processor=create_processor(sm_session, role, nb_config_path), + inputs=[ + ProcessingInput( + input_name="code", + source=notebook_path, + destination=os.path.join(PROCESSING_CONTAINER_DIR, "code"), + ), + ], + outputs=[ + ProcessingOutput( + output_name="output-data", + source=os.path.join(PROCESSING_CONTAINER_DIR, "output-data"), + ) + ], + code=os.path.join(notebook_path, "entrypoint.sh") + ) + + +def create_pipeline(pipeline_name: str, sm_session: Session, steps: list, pipeline_params: list) -> Pipeline: + return Pipeline( + name=pipeline_name, + parameters=pipeline_params, + steps=steps, + sagemaker_session=sm_session, + ) + + +def create_estimator(sm_session: Session, image_uri, role: str, nb_config_path: str, hyperparams_file: str = None): + nb_config = load_nb_config(nb_config_path) + if hyperparams_file: + with open(hyperparams_file) as json_file: + hyperparams_dict = json.load(json_file) + + return Estimator( + image_uri=image_uri, + instance_type=nb_config.processing.instance_type, + instance_count=nb_config.processing.instance_count, + base_job_name=f"notebook-train", + sagemaker_session=sm_session, + role=role, + hyperparameters=hyperparams_dict + ) + + +def create_training_step(train_step_name: str, sm_session: Session, image_uri: str, input_data_uri: str, + validation_data_uri: str, role: str, nb_config_path: str, hyperparams_file: str = None): + estimator = create_estimator(sm_session, image_uri, role, nb_config_path, hyperparams_file) + return TrainingStep( + name=train_step_name, + estimator=estimator, + inputs={ + "train": TrainingInput( + s3_data=input_data_uri, + content_type="text/csv", + ), + "validation": TrainingInput( + s3_data=validation_data_uri, + content_type="text/csv", + ), + }, + ) + + +def compose_pipeline(sm_session: Session, role: str, config_yml_path: str, processing: bool = False, + training: bool = False, image_uri: str = None, notebook_path: str = None, + hyperparams_file=None) -> list: + pipeline_steps = [] + if processing: + processing_step = create_processing_step( + processing_step_name='processing-nb-upsert', + sm_session=sm_session, + notebook_path=notebook_path, + role=role, + nb_config_path=config_yml_path + ) + pipeline_steps.append(processing_step) + + if training: + training_step = create_training_step( + train_step_name="training-nb-upsert", + sm_session=sm_session, + image_uri=image_uri, + input_data_uri='s3://kris-mlops-utilities-test/abalone_data/train', + validation_data_uri='s3://kris-mlops-utilities-test/abalone_data/test', + role=role, + nb_config_path=config_yml_path, + hyperparams_file=hyperparams_file + ) + pipeline_steps.append(training_step) + + return pipeline_steps diff --git a/mlops_utilities/notebook_test_processing_code/entrypoint.sh b/mlops_utilities/notebook_test_processing_code/entrypoint.sh new file mode 100644 index 0000000..12966ae --- /dev/null +++ b/mlops_utilities/notebook_test_processing_code/entrypoint.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +cd /opt/ml/processing/code/ +# Exit on any error. SageMaker uses error code to mark failed job. +set -e +if [[ -f 'requirements.txt' ]]; then + # Some py3 containers has typing, which may breaks pip install + pip uninstall --yes typing + pip install -r requirements.txt +fi + +pip install --upgrade pip ipython ipykernel +ipython kernel install --name "python3" --user + +papermill processing_local_pipeline.ipynb output_processing.ipynb -p role_param arn:aws:iam::311638508164:role/AmazonSageMaker-ExecutionRole -p output_bucket_path kris-mlops-utilities-test +#papermill training_local_pipeline_updated.ipynb output_training.ipynb -p role_param arn:aws:iam::311638508164:role/AmazonSageMaker-ExecutionRole -p output_bucket_path kris-mlops-utilities-test \ No newline at end of file diff --git a/mlops_utilities/notebook_test_processing_code/hyperparams.json b/mlops_utilities/notebook_test_processing_code/hyperparams.json new file mode 100644 index 0000000..de54811 --- /dev/null +++ b/mlops_utilities/notebook_test_processing_code/hyperparams.json @@ -0,0 +1,10 @@ +{ + "objective": "reg:linear", + "num_round": 50, + "max_depth": 5, + "eta":0.2, + "gamma":4, + "min_child_weight":6, + "subsample":0.7, + "silent":0 +} \ No newline at end of file diff --git a/mlops_utilities/notebook_test_processing_code/processing_local_pipeline.ipynb b/mlops_utilities/notebook_test_processing_code/processing_local_pipeline.ipynb new file mode 100644 index 0000000..5c91fd2 --- /dev/null +++ b/mlops_utilities/notebook_test_processing_code/processing_local_pipeline.ipynb @@ -0,0 +1,186 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 13, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + }, + "tags": "parameters" + }, + "outputs": [], + "source": [ + "# default values\n", + "role_param = \"\"\n", + "output_bucket_path = \"\"" + ] + }, + { + "cell_type": "code", + "execution_count": 44, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 5.88 ms, sys: 1.38 ms, total: 7.26 ms\n", + "Wall time: 6.5 ms\n" + ] + } + ], + "source": [ + "%%time\n", + "import boto3\n", + "from time import gmtime, strftime\n", + "from sagemaker import image_uris\n", + "processing_job_name = f\"processing-job-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}\"\n", + "region = \"us-east-1\"\n", + "role = role_param\n", + "client = boto3.client(\"sagemaker\", region_name=region)\n", + "deploy_amt_model = True\n", + "output_prefix = \"local-pipeline\"\n" + ] + }, + { + "cell_type": "code", + "execution_count": 45, + "outputs": [], + "source": [ + "image_uri = image_uris.retrieve(\n", + " framework=\"xgboost\",\n", + " region=\"us-east-1\",\n", + " version=\"1.0-1\",\n", + " py_version=\"py3\",\n", + " instance_type=\"ml.m5.large\",\n", + ")" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 46, + "outputs": [], + "source": [ + "create_processing_params = {\n", + " \"AppSpecification\": {\n", + " 'ImageUri': image_uri\n", + " },\n", + " \"ProcessingInputs\": [\n", + " {\n", + " \"InputName\": \"abalone.train\",\n", + " \"S3Input\": {\n", + " \"LocalPath\": \"/opt/ml/processing/input\",\n", + " \"S3DataDistributionType\": \"FullyReplicated\",\n", + " \"S3InputMode\": \"File\",\n", + " 'S3DataType': \"S3Prefix\",\n", + " \"S3Uri\": \"s3://kris-mlops-utilities-test/abalone_data/train\"\n", + " }\n", + " }\n", + " ],\n", + " \"StoppingCondition\": {\n", + " 'MaxRuntimeInSeconds': 123\n", + " },\n", + " \"ProcessingJobName\": processing_job_name,\n", + " \"ProcessingOutputConfig\": {\n", + " \"Outputs\": [\n", + " {\n", + " \"OutputName\": \"preprocessing_output\",\n", + " \"S3Output\": {\n", + " \"LocalPath\": \"/opt/ml/processing/output\",\n", + " \"S3Uri\": f\"s3://{output_bucket_path}/preprocessing\",\n", + " \"S3UploadMode\": \"EndOfJob\"\n", + " }\n", + " }\n", + " ]\n", + " },\n", + " \"ProcessingResources\": {\n", + " \"ClusterConfig\": {\n", + " \"InstanceCount\": 1,\n", + " \"InstanceType\": \"ml.m5.large\",\n", + " \"VolumeSizeInGB\": 5\n", + " }\n", + " },\n", + " \"RoleArn\": role,\n", + "}" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 48, + "outputs": [ + { + "data": { + "text/plain": "'processing-job-2023-02-07-14-07-04'" + }, + "execution_count": 48, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "processing_job_name" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 49, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "print(f\"Creating a training job with name: {processing_job_name}. It will take between 5 and 6 minutes to complete.\")\n", + "client.create_processing_job(**create_processing_params)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} \ No newline at end of file diff --git a/mlops_utilities/notebook_test_processing_code/requirements.txt b/mlops_utilities/notebook_test_processing_code/requirements.txt new file mode 100644 index 0000000..4bc11ff --- /dev/null +++ b/mlops_utilities/notebook_test_processing_code/requirements.txt @@ -0,0 +1,3 @@ +sagemaker==2.89.0 +boto3==1.23.0 +papermill==2.4.0 \ No newline at end of file diff --git a/mlops_utilities/notebook_test_processing_code/training_local_pipeline_updated.ipynb b/mlops_utilities/notebook_test_processing_code/training_local_pipeline_updated.ipynb new file mode 100644 index 0000000..5fb5dc0 --- /dev/null +++ b/mlops_utilities/notebook_test_processing_code/training_local_pipeline_updated.ipynb @@ -0,0 +1,167 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + }, + "tags": "parameters" + }, + "outputs": [], + "source": [ + "# default values\n", + "role_param = \"\"\n", + "output_bucket_path = \"\"" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 456 ms, sys: 303 ms, total: 758 ms\n", + "Wall time: 1.15 s\n" + ] + } + ], + "source": [ + "%%time\n", + "import boto3\n", + "from time import gmtime, strftime\n", + "import time\n", + "from sagemaker import image_uris\n", + "\n", + "training_job_name = f\"notebook-training-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}\"\n", + "region = \"us-east-1\"\n", + "role = role_param\n", + "client = boto3.client(\"sagemaker\", region_name=region)\n", + "deploy_amt_model = True\n", + "output_prefix = \"local-pipeline\"\n", + "container = image_uris.retrieve(\"xgboost\", region, \"1.5-1\")\n", + "# s3://sagemaker-mlops-p-vicbs68pvwtg/abalonedata/data.csv" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [], + "source": [ + "create_training_params = {\n", + " \"AlgorithmSpecification\": {\"TrainingImage\": container, \"TrainingInputMode\": \"File\"},\n", + " \"RoleArn\": role,\n", + " \"OutputDataConfig\": {\"S3OutputPath\": f\"s3://{output_bucket_path}/single-xgboost\"},\n", + " \"ResourceConfig\": {\"InstanceCount\": 1, \"InstanceType\": \"ml.m5.2xlarge\", \"VolumeSizeInGB\": 5},\n", + " \"TrainingJobName\": training_job_name,\n", + " \"HyperParameters\": {\n", + " \"max_depth\": \"5\",\n", + " \"eta\": \"0.2\",\n", + " \"gamma\": \"4\",\n", + " \"min_child_weight\": \"6\",\n", + " \"subsample\": \"0.7\",\n", + " \"objective\": \"reg:linear\",\n", + " \"num_round\": \"50\",\n", + " \"verbosity\": \"2\",\n", + " },\n", + " \"StoppingCondition\": {\"MaxRuntimeInSeconds\": 3600},\n", + " \"InputDataConfig\": [\n", + " {\n", + " \"ChannelName\": \"train\",\n", + " \"DataSource\": {\n", + " \"S3DataSource\": {\n", + " \"S3DataType\": \"S3Prefix\",\n", + " \"S3Uri\": f\"s3://{output_bucket_path}/abalone_data/train\",\n", + " \"S3DataDistributionType\": \"FullyReplicated\",\n", + " }\n", + " },\n", + " \"ContentType\": \"libsvm\",\n", + " \"CompressionType\": \"None\",\n", + " },\n", + " {\n", + " \"ChannelName\": \"validation\",\n", + " \"DataSource\": {\n", + " \"S3DataSource\": {\n", + " \"S3DataType\": \"S3Prefix\",\n", + " \"S3Uri\": f\"s3://{output_bucket_path}/abalone_data/validation\",\n", + " \"S3DataDistributionType\": \"FullyReplicated\",\n", + " }\n", + " },\n", + " \"ContentType\": \"libsvm\",\n", + " \"CompressionType\": \"None\",\n", + " },\n", + " ],\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Creating a training job with name: DEMO-xgboost-regression-2023-01-23-15-13-28. It will take between 5 and 6 minutes to complete.\n", + "InProgress\n", + "InProgress\n", + "InProgress\n", + "Completed\n" + ] + } + ], + "source": [ + "print(f\"Creating a training job with name: {training_job_name}. It will take between 5 and 6 minutes to complete.\")\n", + "client.create_training_job(**create_training_params)\n", + "status = client.describe_training_job(TrainingJobName=training_job_name)[\"TrainingJobStatus\"]\n", + "while status != \"Completed\" and status != \"Failed\":\n", + " time.sleep(60)\n", + " status = client.describe_training_job(TrainingJobName=training_job_name)[\"TrainingJobStatus\"]\n", + " print(status)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} \ No newline at end of file diff --git a/mlops_utilities/notebook_test_processing_code/training_pipeline.defaults.yml b/mlops_utilities/notebook_test_processing_code/training_pipeline.defaults.yml new file mode 100644 index 0000000..5b6ed02 --- /dev/null +++ b/mlops_utilities/notebook_test_processing_code/training_pipeline.defaults.yml @@ -0,0 +1,17 @@ +pipeline: + default_bucket: ??? + role: ??? + cache_config: + enable_caching: True + expire_after: p1d + model_package_group_name: ??? + +processing: + instance_count: 1 + instance_type: ml.t3.medium + role: ${pipeline.role} + +training: + instance_count: 1 + instance_type: ml.m5.large + role: ${pipeline.role} diff --git a/mlops_utilities/notebook_tests/test.ipynb b/mlops_utilities/notebook_tests/test.ipynb new file mode 100644 index 0000000..9633c43 --- /dev/null +++ b/mlops_utilities/notebook_tests/test.ipynb @@ -0,0 +1,272 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 36, + "outputs": [], + "source": [ + "from sagemaker.workflow.steps import ProcessingStep\n", + "from sagemaker.workflow.pipeline import Pipeline\n", + "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", + "from sagemaker.processing import FrameworkProcessor\n", + "from sagemaker.sklearn import SKLearn\n", + "from sagemaker import Session\n", + "\n", + "import os" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 37, + "outputs": [], + "source": [ + "PROCESSING_CONTAINER_DIR = \"/opt/ml/processing\"\n", + "PREPROCESSING_COMPONENT_SOURCE_DIR = \"/Users/knikitiuk/workspace/mlops-utilities/mlops_utilities/processing_code\"" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 38, + "outputs": [], + "source": [ + "def create_processor(sagemaker_session) -> FrameworkProcessor:\n", + " return FrameworkProcessor(\n", + " estimator_cls=SKLearn,\n", + " framework_version=\"0.23-1\",\n", + " role=\"arn:aws:iam::311638508164:role/AmazonSageMaker-ExecutionRole\",\n", + " instance_count=1,\n", + " instance_type=\"ml.t3.medium\",\n", + " sagemaker_session=sagemaker_session,\n", + " )" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 39, + "outputs": [], + "source": [ + "\n", + "sm_session = Session(default_bucket='kris-mlops-utilities-test')\n", + "\n", + "notebook_processing = ProcessingStep(\n", + " \"NotebookProcessing\",\n", + " processor=create_processor(sm_session),\n", + " inputs=[\n", + " ProcessingInput(\n", + " input_name=\"code\",\n", + " source=PREPROCESSING_COMPONENT_SOURCE_DIR,\n", + " destination=os.path.join(PROCESSING_CONTAINER_DIR, \"code\"),\n", + " ),\n", + " ],\n", + " outputs=[\n", + " ProcessingOutput(\n", + " output_name=\"output-data\",\n", + " source=os.path.join(PROCESSING_CONTAINER_DIR, \"output-data\"),\n", + " )\n", + " ],\n", + " code=os.path.join(PREPROCESSING_COMPONENT_SOURCE_DIR, \"entrypoint.sh\")\n", + " )" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 40, + "outputs": [], + "source": [ + "\n", + "pipeline = Pipeline(\n", + " name='processing-notebook-pipeline',\n", + " parameters=[],\n", + " steps=[\n", + " notebook_processing\n", + " ],\n", + " sagemaker_session=sm_session,\n", + ")\n" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 41, + "outputs": [ + { + "data": { + "text/plain": "{'PipelineArn': 'arn:aws:sagemaker:us-east-1:311638508164:pipeline/processing-notebook-pipeline',\n 'ResponseMetadata': {'RequestId': '9074f41b-71fa-419c-b420-06c811db7a48',\n 'HTTPStatusCode': 200,\n 'HTTPHeaders': {'x-amzn-requestid': '9074f41b-71fa-419c-b420-06c811db7a48',\n 'content-type': 'application/x-amz-json-1.1',\n 'content-length': '96',\n 'date': 'Fri, 10 Feb 2023 13:23:20 GMT'},\n 'RetryAttempts': 0}}" + }, + "execution_count": 41, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "pipeline.upsert(role_arn='arn:aws:iam::311638508164:role/AmazonSageMaker-ExecutionRole')" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 3, + "outputs": [ + { + "data": { + "text/plain": "{'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:311638508164:pipeline/test-upsert/execution/wuiix8q6ji5x',\n 'ResponseMetadata': {'RequestId': 'ed7701b7-4c3a-4c75-a742-08bd7e0c2d68',\n 'HTTPStatusCode': 200,\n 'HTTPHeaders': {'x-amzn-requestid': 'ed7701b7-4c3a-4c75-a742-08bd7e0c2d68',\n 'content-type': 'application/x-amz-json-1.1',\n 'content-length': '111',\n 'date': 'Mon, 13 Feb 2023 12:32:49 GMT'},\n 'RetryAttempts': 0}}" + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from actions import run_pipeline\n", + "\n", + "run_pipeline(pipeline_name='test-upsert', execution_name_prefix='test', pipeline_params={})" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 2, + "outputs": [], + "source": [ + "from actions import upsert_notebook_pipeline\n", + "#\n", + "upsert_notebook_pipeline(pipeline_name='test-upsert', notebook_path='/Users/knikitiuk/workspace/mlops-utilities/mlops_utilities/processing_code/', config_type='')" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 4, + "outputs": [], + "source": [ + "pipeline_role = 'arn:aws:iam::311638508164:role/AmazonSageMaker-ExecutionRole'" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 9, + "outputs": [], + "source": [ + "from omegaconf import OmegaConf\n", + "\n", + "default_conf_path = '/Users/knikitiuk/workspace/mlops-utilities/mlops_utilities/processing_code/training_pipeline.defaults.yml'\n", + "default_conf = OmegaConf.load(default_conf_path)\n", + "# arg_conf = OmegaConf.create({'pipeline': {'role': pipeline_role}})\n", + "# override_arg_conf = OmegaConf.from_dotlist(args)\n", + "# return OmegaConf.merge(default_conf, arg_conf, override_arg_conf)" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 10, + "outputs": [ + { + "data": { + "text/plain": "{'pipeline': {'default_bucket': '???', 'role': '???', 'cache_config': {'enable_caching': True, 'expire_after': 'p1d'}, 'model_package_group_name': '???'}, 'featurizing': {'instance_count': 1, 'instance_type': 'ml.t3.medium', 'role': '${pipeline.role}'}, 'training': {'instance_count': 1, 'instance_type': 'ml.m5.large', 'role': '${pipeline.role}'}, 'model': {'instance_count': 1, 'instance_type': 'ml.m5.large', 'role': '${pipeline.role}'}, 'clarify': {'instance_count': 1, 'instance_type': 'ml.m5.large', 'role': '${pipeline.role}'}, 'monitor': {'role': '${pipeline.role}'}}" + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "default_conf" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} \ No newline at end of file diff --git a/mlops_utilities/notebook_tests/test_training.ipynb b/mlops_utilities/notebook_tests/test_training.ipynb new file mode 100644 index 0000000..2815bed --- /dev/null +++ b/mlops_utilities/notebook_tests/test_training.ipynb @@ -0,0 +1,468 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "outputs": [], + "source": [ + "from sagemaker.workflow.steps import TrainingStep\n", + "from sagemaker.workflow.pipeline import Pipeline\n", + "from sagemaker import Session, image_uris\n" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 2, + "outputs": [], + "source": [ + "PROCESSING_CONTAINER_DIR = \"/opt/ml/processing\"\n", + "PREPROCESSING_COMPONENT_SOURCE_DIR = \"/Users/knikitiuk/workspace/mlops-utilities/mlops_utilities/processing_code\"" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 3, + "outputs": [], + "source": [ + "sm_session = Session(default_bucket='kris-mlops-utilities-test')\n" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 4, + "outputs": [], + "source": [ + "image_uri = image_uris.retrieve(\n", + " framework=\"xgboost\",\n", + " region=\"us-east-1\",\n", + " version=\"1.0-1\",\n", + " py_version=\"py3\",\n", + " instance_type=\"ml.m5.large\",\n", + " )" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 5, + "outputs": [], + "source": [ + "from sagemaker.estimator import Estimator\n", + "\n", + "xgb_train = Estimator(\n", + " image_uri=image_uri,\n", + " instance_type=\"ml.m5.large\",\n", + " instance_count=1,\n", + " base_job_name=f\"notebook-train\",\n", + " sagemaker_session=sm_session,\n", + " role=\"arn:aws:iam::311638508164:role/AmazonSageMaker-ExecutionRole\",\n", + ")\n", + "\n", + "xgb_train.set_hyperparameters(\n", + " objective=\"reg:linear\",\n", + " num_round=50,\n", + " max_depth=5,\n", + " eta=0.2,\n", + " gamma=4,\n", + " min_child_weight=6,\n", + " subsample=0.7,\n", + " silent=0,\n", + ")" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 6, + "outputs": [], + "source": [ + "from sagemaker import TrainingInput\n", + "\n", + "notebook_training = TrainingStep(\n", + " name=\"TrainModel\",\n", + " estimator=xgb_train,\n", + " inputs={\n", + " \"train\": TrainingInput(\n", + " s3_data=\"s3://kris-mlops-utilities-test/abalone_data/train\",\n", + " content_type=\"text/csv\",\n", + " ),\n", + " \"validation\": TrainingInput(\n", + " s3_data=\"s3://kris-mlops-utilities-test/abalone_data/test\",\n", + " content_type=\"text/csv\",\n", + " ),\n", + " },\n", + ")" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 7, + "outputs": [], + "source": [ + "\n", + "pipeline = Pipeline(\n", + " name='training-notebook-pipeline',\n", + " parameters=[],\n", + " steps=[\n", + " notebook_training\n", + " ],\n", + " sagemaker_session=sm_session,\n", + ")\n" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 8, + "outputs": [ + { + "data": { + "text/plain": "{'PipelineArn': 'arn:aws:sagemaker:us-east-1:311638508164:pipeline/training-notebook-pipeline',\n 'ResponseMetadata': {'RequestId': 'd9406cb9-7302-4a2a-9748-f4fac5ae6845',\n 'HTTPStatusCode': 200,\n 'HTTPHeaders': {'x-amzn-requestid': 'd9406cb9-7302-4a2a-9748-f4fac5ae6845',\n 'content-type': 'application/x-amz-json-1.1',\n 'content-length': '94',\n 'date': 'Fri, 10 Feb 2023 14:01:39 GMT'},\n 'RetryAttempts': 0}}" + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "pipeline.upsert(role_arn='arn:aws:iam::311638508164:role/AmazonSageMaker-ExecutionRole')" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 9, + "outputs": [ + { + "data": { + "text/plain": "{'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:311638508164:pipeline/training-notebook-pipeline/execution/kd97j0rnhzol',\n 'ResponseMetadata': {'RequestId': '49506d08-3b16-4e48-b827-868fe66467d5',\n 'HTTPStatusCode': 200,\n 'HTTPHeaders': {'x-amzn-requestid': '49506d08-3b16-4e48-b827-868fe66467d5',\n 'content-type': 'application/x-amz-json-1.1',\n 'content-length': '126',\n 'date': 'Fri, 10 Feb 2023 14:01:44 GMT'},\n 'RetryAttempts': 0}}" + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from actions import run_pipeline\n", + "\n", + "run_pipeline(pipeline_name='training-notebook-pipeline', execution_name_prefix='train', pipeline_params={})" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 6, + "outputs": [ + { + "ename": "ValueError", + "evalue": "Either step_args or estimator need to be given.", + "output_type": "error", + "traceback": [ + "\u001B[0;31m---------------------------------------------------------------------------\u001B[0m", + "\u001B[0;31mValueError\u001B[0m Traceback (most recent call last)", + "Cell \u001B[0;32mIn [6], line 2\u001B[0m\n\u001B[1;32m 1\u001B[0m \u001B[38;5;28;01mfrom\u001B[39;00m \u001B[38;5;21;01mactions\u001B[39;00m \u001B[38;5;28;01mimport\u001B[39;00m upsert_notebook_pipeline\n\u001B[0;32m----> 2\u001B[0m \u001B[43mupsert_notebook_pipeline\u001B[49m\u001B[43m(\u001B[49m\u001B[43mpipeline_name\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[38;5;124;43m'\u001B[39;49m\u001B[38;5;124;43mtest-upsert\u001B[39;49m\u001B[38;5;124;43m'\u001B[39;49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43mstep_name\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[38;5;124;43m'\u001B[39;49m\u001B[38;5;124;43mtraining_step\u001B[39;49m\u001B[38;5;124;43m'\u001B[39;49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43mnotebook_path\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[38;5;124;43m'\u001B[39;49m\u001B[38;5;124;43m/Users/knikitiuk/workspace/mlops-utilities/mlops_utilities/processing_code/\u001B[39;49m\u001B[38;5;124;43m'\u001B[39;49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43mconfig_type\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[38;5;124;43m'\u001B[39;49m\u001B[38;5;124;43m'\u001B[39;49m\u001B[43m,\u001B[49m\u001B[43m \u001B[49m\u001B[43mimage_uri\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[43mimage_uri\u001B[49m\u001B[43m)\u001B[49m\n", + "File \u001B[0;32m~/workspace/mlops-utilities/mlops_utilities/actions.py:112\u001B[0m, in \u001B[0;36mupsert_notebook_pipeline\u001B[0;34m(pipeline_name, step_name, notebook_path, config_type, pipeline_tags, image_uri, is_training, dryrun)\u001B[0m\n\u001B[1;32m 105\u001B[0m \u001B[38;5;66;03m# processing_step = notebook_helper.create_processor_step(processing_step_name=step_name,\u001B[39;00m\n\u001B[1;32m 106\u001B[0m \u001B[38;5;66;03m# sm_session=sm_session,\u001B[39;00m\n\u001B[1;32m 107\u001B[0m \u001B[38;5;66;03m# notebook_path=notebook_path)\u001B[39;00m\n\u001B[1;32m 109\u001B[0m training_estimator \u001B[38;5;241m=\u001B[39m notebook_helper\u001B[38;5;241m.\u001B[39mcreate_estimator(sm_session\u001B[38;5;241m=\u001B[39msm_session,\n\u001B[1;32m 110\u001B[0m image_uri\u001B[38;5;241m=\u001B[39mimage_uri)\n\u001B[0;32m--> 112\u001B[0m training_step \u001B[38;5;241m=\u001B[39m \u001B[43mnotebook_helper\u001B[49m\u001B[38;5;241;43m.\u001B[39;49m\u001B[43mcreate_training_step\u001B[49m\u001B[43m(\u001B[49m\u001B[43mestimator\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[43mtraining_estimator\u001B[49m\u001B[43m,\u001B[49m\n\u001B[1;32m 113\u001B[0m \u001B[43m \u001B[49m\u001B[43minput_data_uri\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[38;5;124;43mf\u001B[39;49m\u001B[38;5;124;43m'\u001B[39;49m\u001B[38;5;124;43ms3://\u001B[39;49m\u001B[38;5;132;43;01m{\u001B[39;49;00m\u001B[43msm_session\u001B[49m\u001B[38;5;241;43m.\u001B[39;49m\u001B[43mdefault_bucket\u001B[49m\u001B[38;5;132;43;01m}\u001B[39;49;00m\u001B[38;5;124;43m/abalone_data/train\u001B[39;49m\u001B[38;5;124;43m'\u001B[39;49m\u001B[43m,\u001B[49m\n\u001B[1;32m 114\u001B[0m \u001B[43m \u001B[49m\u001B[43mvalidation_data_uri\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[38;5;124;43mf\u001B[39;49m\u001B[38;5;124;43m'\u001B[39;49m\u001B[38;5;124;43ms3://\u001B[39;49m\u001B[38;5;132;43;01m{\u001B[39;49;00m\u001B[43msm_session\u001B[49m\u001B[38;5;241;43m.\u001B[39;49m\u001B[43mdefault_bucket\u001B[49m\u001B[38;5;132;43;01m}\u001B[39;49;00m\u001B[38;5;124;43m/abalone_data/test\u001B[39;49m\u001B[38;5;124;43m'\u001B[39;49m\u001B[43m)\u001B[49m\n\u001B[1;32m 116\u001B[0m pipeline \u001B[38;5;241m=\u001B[39m notebook_helper\u001B[38;5;241m.\u001B[39mcreate_pipeline(pipeline_name\u001B[38;5;241m=\u001B[39mpipeline_name,\n\u001B[1;32m 117\u001B[0m sm_session\u001B[38;5;241m=\u001B[39msm_session,\n\u001B[1;32m 118\u001B[0m steps\u001B[38;5;241m=\u001B[39m[\n\u001B[0;32m (...)\u001B[0m\n\u001B[1;32m 121\u001B[0m ],\n\u001B[1;32m 122\u001B[0m pipeline_params\u001B[38;5;241m=\u001B[39m[])\n\u001B[1;32m 124\u001B[0m pipeline\u001B[38;5;241m.\u001B[39mupsert(\u001B[38;5;124m'\u001B[39m\u001B[38;5;124marn:aws:iam::311638508164:role/AmazonSageMaker-ExecutionRole\u001B[39m\u001B[38;5;124m'\u001B[39m, tags\u001B[38;5;241m=\u001B[39mpipeline_tags)\n", + "File \u001B[0;32m~/workspace/mlops-utilities/mlops_utilities/notebook_helper.py:78\u001B[0m, in \u001B[0;36mcreate_training_step\u001B[0;34m(estimator, input_data_uri, validation_data_uri)\u001B[0m\n\u001B[1;32m 77\u001B[0m \u001B[38;5;28;01mdef\u001B[39;00m \u001B[38;5;21mcreate_training_step\u001B[39m(estimator: Estimator, input_data_uri: \u001B[38;5;28mstr\u001B[39m, validation_data_uri: \u001B[38;5;28mstr\u001B[39m):\n\u001B[0;32m---> 78\u001B[0m \u001B[38;5;28;01mreturn\u001B[39;00m \u001B[43mTrainingStep\u001B[49m\u001B[43m(\u001B[49m\n\u001B[1;32m 79\u001B[0m \u001B[43m \u001B[49m\u001B[43mname\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[38;5;124;43m\"\u001B[39;49m\u001B[38;5;124;43mTrainModel\u001B[39;49m\u001B[38;5;124;43m\"\u001B[39;49m\u001B[43m,\u001B[49m\n\u001B[1;32m 80\u001B[0m \u001B[43m \u001B[49m\u001B[43mestimator\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[43mestimator\u001B[49m\u001B[43m,\u001B[49m\n\u001B[1;32m 81\u001B[0m \u001B[43m \u001B[49m\u001B[43minputs\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[43m{\u001B[49m\n\u001B[1;32m 82\u001B[0m \u001B[43m \u001B[49m\u001B[38;5;124;43m\"\u001B[39;49m\u001B[38;5;124;43mtrain\u001B[39;49m\u001B[38;5;124;43m\"\u001B[39;49m\u001B[43m:\u001B[49m\u001B[43m \u001B[49m\u001B[43mTrainingInput\u001B[49m\u001B[43m(\u001B[49m\n\u001B[1;32m 83\u001B[0m \u001B[43m \u001B[49m\u001B[43ms3_data\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[43minput_data_uri\u001B[49m\u001B[43m,\u001B[49m\n\u001B[1;32m 84\u001B[0m \u001B[43m \u001B[49m\u001B[43mcontent_type\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[38;5;124;43m\"\u001B[39;49m\u001B[38;5;124;43mtext/csv\u001B[39;49m\u001B[38;5;124;43m\"\u001B[39;49m\u001B[43m,\u001B[49m\n\u001B[1;32m 85\u001B[0m \u001B[43m \u001B[49m\u001B[43m)\u001B[49m\u001B[43m,\u001B[49m\n\u001B[1;32m 86\u001B[0m \u001B[43m \u001B[49m\u001B[38;5;124;43m\"\u001B[39;49m\u001B[38;5;124;43mvalidation\u001B[39;49m\u001B[38;5;124;43m\"\u001B[39;49m\u001B[43m:\u001B[49m\u001B[43m \u001B[49m\u001B[43mTrainingInput\u001B[49m\u001B[43m(\u001B[49m\n\u001B[1;32m 87\u001B[0m \u001B[43m \u001B[49m\u001B[43ms3_data\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[43mvalidation_data_uri\u001B[49m\u001B[43m,\u001B[49m\n\u001B[1;32m 88\u001B[0m \u001B[43m \u001B[49m\u001B[43mcontent_type\u001B[49m\u001B[38;5;241;43m=\u001B[39;49m\u001B[38;5;124;43m\"\u001B[39;49m\u001B[38;5;124;43mtext/csv\u001B[39;49m\u001B[38;5;124;43m\"\u001B[39;49m\u001B[43m,\u001B[49m\n\u001B[1;32m 89\u001B[0m \u001B[43m \u001B[49m\u001B[43m)\u001B[49m\u001B[43m,\u001B[49m\n\u001B[1;32m 90\u001B[0m \u001B[43m \u001B[49m\u001B[43m}\u001B[49m\u001B[43m,\u001B[49m\n\u001B[1;32m 91\u001B[0m \u001B[43m \u001B[49m\u001B[43m)\u001B[49m\n", + "File \u001B[0;32m~/workspace/mlops-utilities/venv/lib/python3.8/site-packages/sagemaker/workflow/steps.py:415\u001B[0m, in \u001B[0;36mTrainingStep.__init__\u001B[0;34m(self, name, step_args, estimator, display_name, description, inputs, cache_config, depends_on, retry_policies)\u001B[0m\n\u001B[1;32m 410\u001B[0m \u001B[38;5;28msuper\u001B[39m(TrainingStep, \u001B[38;5;28mself\u001B[39m)\u001B[38;5;241m.\u001B[39m\u001B[38;5;21m__init__\u001B[39m(\n\u001B[1;32m 411\u001B[0m name, StepTypeEnum\u001B[38;5;241m.\u001B[39mTRAINING, display_name, description, depends_on, retry_policies\n\u001B[1;32m 412\u001B[0m )\n\u001B[1;32m 414\u001B[0m \u001B[38;5;28;01mif\u001B[39;00m \u001B[38;5;129;01mnot\u001B[39;00m (step_args \u001B[38;5;129;01mis\u001B[39;00m \u001B[38;5;129;01mnot\u001B[39;00m \u001B[38;5;28;01mNone\u001B[39;00m) \u001B[38;5;241m^\u001B[39m (estimator \u001B[38;5;129;01mis\u001B[39;00m \u001B[38;5;129;01mnot\u001B[39;00m \u001B[38;5;28;01mNone\u001B[39;00m):\n\u001B[0;32m--> 415\u001B[0m \u001B[38;5;28;01mraise\u001B[39;00m \u001B[38;5;167;01mValueError\u001B[39;00m(\u001B[38;5;124m\"\u001B[39m\u001B[38;5;124mEither step_args or estimator need to be given.\u001B[39m\u001B[38;5;124m\"\u001B[39m)\n\u001B[1;32m 417\u001B[0m \u001B[38;5;28;01mif\u001B[39;00m step_args:\n\u001B[1;32m 418\u001B[0m \u001B[38;5;28;01mfrom\u001B[39;00m \u001B[38;5;21;01msagemaker\u001B[39;00m\u001B[38;5;21;01m.\u001B[39;00m\u001B[38;5;21;01mworkflow\u001B[39;00m\u001B[38;5;21;01m.\u001B[39;00m\u001B[38;5;21;01mutilities\u001B[39;00m \u001B[38;5;28;01mimport\u001B[39;00m validate_step_args_input\n", + "\u001B[0;31mValueError\u001B[0m: Either step_args or estimator need to be given." + ] + } + ], + "source": [ + "from actions import upsert_notebook_pipeline\n", + "upsert_notebook_pipeline(pipeline_name='test-upsert', step_name='training_step', notebook_path='/Users/knikitiuk/workspace/mlops-utilities/mlops_utilities/processing_code/', config_type='', image_uri=image_uri)" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "# pipeline_name: str,\n", + "# step_name: str,\n", + "# notebook_path: str,\n", + "# config_type: str,\n", + "# pipeline_tags: Optional[Dict[str, str]] = None,\n", + "# image_uri: Optional[str] = None,\n", + "# is_training: bool = False,\n", + "# dryrun: bool = False," + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 8, + "outputs": [], + "source": [ + "from actions import upsert_notebook_pipeline\n", + "upsert_notebook_pipeline(pipeline_name='test-upsert', notebook_path='/Users/knikitiuk/workspace/mlops-utilities/mlops_utilities/processing_code/', config_type='', image_uri=image_uri)" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 9, + "outputs": [ + { + "data": { + "text/plain": "{'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:311638508164:pipeline/test-upsert/execution/nqid8tg82rhm',\n 'ResponseMetadata': {'RequestId': 'dc07b084-26e4-4917-ab4d-62ed4bf76ae6',\n 'HTTPStatusCode': 200,\n 'HTTPHeaders': {'x-amzn-requestid': 'dc07b084-26e4-4917-ab4d-62ed4bf76ae6',\n 'content-type': 'application/x-amz-json-1.1',\n 'content-length': '111',\n 'date': 'Mon, 13 Feb 2023 12:31:30 GMT'},\n 'RetryAttempts': 0}}" + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from actions import run_pipeline\n", + "\n", + "run_pipeline(pipeline_name='test-upsert', execution_name_prefix='test', pipeline_params={})" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "# sm_session: Session, image_uri, role: str, nb_config_path: str, hyperparams_file: str" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 5, + "outputs": [ + { + "data": { + "text/plain": "" + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from notebook_helper import create_estimator\n", + "\n", + "create_estimator(sm_session=sm_session, image_uri=image_uri, role=\"arn:aws:iam::311638508164:role/AmazonSageMaker-ExecutionRole\", nb_config_path=\"/Users/knikitiuk/workspace/mlops-utilities/mlops_utilities/processing_code/training_pipeline.defaults.yml\", hyperparams_file=\"/Users/knikitiuk/workspace/mlops-utilities/mlops_utilities/processing_code/hyperparams.json\")" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "#pipeline_name: str,\n", + " # notebook_path: str,\n", + " # role: str,\n", + " # nb_yml_config: str,\n", + " # pipeline_tags: Optional[Dict[str, str]] = None,\n", + " # image_uri: Optional[str] = None,\n", + " # dryrun: bool = False," + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 1, + "outputs": [], + "source": [ + "role = \"arn:aws:iam::311638508164:role/AmazonSageMaker-ExecutionRole\"\n", + "nb_config_path = 'processing_code/training_pipeline.defaults.yml'" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 2, + "outputs": [], + "source": [ + "from actions import upsert_notebook_pipeline\n", + "\n", + "upsert_notebook_pipeline(pipeline_name='nb-test',\n", + " notebook_path ='/Users/knikitiuk/workspace/mlops-utilities/mlops_utilities/processing_code',\n", + " role=role,\n", + " nb_yml_config=nb_config_path,\n", + " pipeline_tags=None, image_uri=None, dryrun=False)" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 8, + "outputs": [ + { + "data": { + "text/plain": "{'pipeline': {'default_bucket': '???', 'role': '???', 'cache_config': {'enable_caching': True, 'expire_after': 'p1d'}, 'model_package_group_name': '???'}, 'processing': {'instance_count': 1, 'instance_type': 'ml.t3.medium', 'role': '${pipeline.role}'}, 'training': {'instance_count': 1, 'instance_type': 'ml.m5.large', 'role': '${pipeline.role}'}}" + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from omegaconf import OmegaConf\n", + "\n", + "OmegaConf.load('processing_code/training_pipeline.defaults.yml')" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": 7, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "__init__.py helpers.py \u001B[34mprocessing_code\u001B[m\u001B[m test_training.ipynb\r\n", + "actions.py notebook_helper.py test.ipynb\r\n" + ] + } + ], + "source": [ + "!ls" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} \ No newline at end of file From e703ead2947c1dd88ff0b62f76aa6e4f08eabe12 Mon Sep 17 00:00:00 2001 From: knikitiuk <36886708+KristinaNikitiuk@users.noreply.github.com> Date: Fri, 24 Feb 2023 16:15:10 +0300 Subject: [PATCH 2/6] code clean up --- mlops_utilities/actions.py | 5 +++-- mlops_utilities/notebook_helper.py | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/mlops_utilities/actions.py b/mlops_utilities/actions.py index 68367a0..e3a33af 100644 --- a/mlops_utilities/actions.py +++ b/mlops_utilities/actions.py @@ -11,8 +11,8 @@ from sagemaker.model_monitor import DataCaptureConfig from sagemaker.workflow.pipeline_context import PipelineSession -from mlops_utilities import helpers import notebook_helper +from mlops_utilities import helpers logger = logging.getLogger(__name__) @@ -145,7 +145,8 @@ def upsert_notebook_pipeline( role=role, config_yml_path=nb_yml_config, processing=True, - notebook_path=notebook_path + notebook_path=notebook_path, + image_uri=None ) pipeline = notebook_helper.create_pipeline( diff --git a/mlops_utilities/notebook_helper.py b/mlops_utilities/notebook_helper.py index 1344101..042d551 100644 --- a/mlops_utilities/notebook_helper.py +++ b/mlops_utilities/notebook_helper.py @@ -62,14 +62,14 @@ def create_pipeline(pipeline_name: str, sm_session: Session, steps: list, pipeli def create_estimator(sm_session: Session, image_uri, role: str, nb_config_path: str, hyperparams_file: str = None): nb_config = load_nb_config(nb_config_path) if hyperparams_file: - with open(hyperparams_file) as json_file: + with open(hyperparams_file, encoding='utf-8') as json_file: hyperparams_dict = json.load(json_file) return Estimator( image_uri=image_uri, instance_type=nb_config.processing.instance_type, instance_count=nb_config.processing.instance_count, - base_job_name=f"notebook-train", + base_job_name="notebook-train", sagemaker_session=sm_session, role=role, hyperparameters=hyperparams_dict From 1edaade405145337b0f06df70041c357d16033de Mon Sep 17 00:00:00 2001 From: knikitiuk <36886708+KristinaNikitiuk@users.noreply.github.com> Date: Fri, 24 Feb 2023 16:54:40 +0300 Subject: [PATCH 3/6] docstrings --- mlops_utilities/notebook_helper.py | 83 +++++++++++++++++++++++ mlops_utilities/notebook_tests/test.ipynb | 2 +- 2 files changed, 84 insertions(+), 1 deletion(-) diff --git a/mlops_utilities/notebook_helper.py b/mlops_utilities/notebook_helper.py index 042d551..84f927e 100644 --- a/mlops_utilities/notebook_helper.py +++ b/mlops_utilities/notebook_helper.py @@ -13,10 +13,28 @@ def load_nb_config(nb_config_path: str): + """ + + Args: + nb_config_path: local path of notebook yml configs + + Returns: + loaded yml configs + """ return OmegaConf.load(nb_config_path) def create_processor(sm_session: Session, role: str, nb_config_path: str) -> FrameworkProcessor: + """ + + Args: + sm_session: sagemaker session + role: role arn + nb_config_path: local path of notebook yml configs + + Returns: + + """ nb_config = load_nb_config(nb_config_path) return FrameworkProcessor( estimator_cls=SKLearn, @@ -30,6 +48,18 @@ def create_processor(sm_session: Session, role: str, nb_config_path: str) -> Fra def create_processing_step(processing_step_name: str, sm_session: Session, notebook_path: str, role: str, nb_config_path: str) -> ProcessingStep: + """ + + Args: + processing_step_name: processing step name + sm_session: sagemaker session + notebook_path: local path of jupyter notebook + role: role arn + nb_config_path: local path of notebook yml configs + + Returns: + sagemaker processing job + """ return ProcessingStep( processing_step_name, processor=create_processor(sm_session, role, nb_config_path), @@ -51,6 +81,17 @@ def create_processing_step(processing_step_name: str, sm_session: Session, noteb def create_pipeline(pipeline_name: str, sm_session: Session, steps: list, pipeline_params: list) -> Pipeline: + """ + + Args: + pipeline_name: pipeline name + sm_session: sagemaker session + steps: list of composed steps from jupyter notebook + pipeline_params: pipeline params + + Returns: + sagemaker pipeline + """ return Pipeline( name=pipeline_name, parameters=pipeline_params, @@ -60,6 +101,18 @@ def create_pipeline(pipeline_name: str, sm_session: Session, steps: list, pipeli def create_estimator(sm_session: Session, image_uri, role: str, nb_config_path: str, hyperparams_file: str = None): + """ + + Args: + sm_session: sagemaker session + image_uri: uri of docker image pushed to sagemaker + role: role arn + nb_config_path: local path of notebook yml configs + hyperparams_file: file with hyperparameters for model + + Returns: + estimator for training job + """ nb_config = load_nb_config(nb_config_path) if hyperparams_file: with open(hyperparams_file, encoding='utf-8') as json_file: @@ -78,6 +131,21 @@ def create_estimator(sm_session: Session, image_uri, role: str, nb_config_path: def create_training_step(train_step_name: str, sm_session: Session, image_uri: str, input_data_uri: str, validation_data_uri: str, role: str, nb_config_path: str, hyperparams_file: str = None): + """ + + Args: + train_step_name: train step name + sm_session: sagemaker session + image_uri: image uri + input_data_uri: input data url + validation_data_uri: validation data url + role: role arn + nb_config_path: local path of notebook yml configs + hyperparams_file: local path of hyperparameters file + + Returns: + + """ estimator = create_estimator(sm_session, image_uri, role, nb_config_path, hyperparams_file) return TrainingStep( name=train_step_name, @@ -98,6 +166,21 @@ def create_training_step(train_step_name: str, sm_session: Session, image_uri: s def compose_pipeline(sm_session: Session, role: str, config_yml_path: str, processing: bool = False, training: bool = False, image_uri: str = None, notebook_path: str = None, hyperparams_file=None) -> list: + """ + + Args: + sm_session: sagemaker session + role: role arn + config_yml_path: local path of notebook yml configs + processing: true IF you want to include processing step + training: true IF you want to include training step + image_uri: image uri of pushed image to sagemaker + notebook_path: local path of notebook yml configs + hyperparams_file: local path of hyperparameters file + + Returns: + list of composed steps + """ pipeline_steps = [] if processing: processing_step = create_processing_step( diff --git a/mlops_utilities/notebook_tests/test.ipynb b/mlops_utilities/notebook_tests/test.ipynb index 9633c43..8dc2882 100644 --- a/mlops_utilities/notebook_tests/test.ipynb +++ b/mlops_utilities/notebook_tests/test.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 36, + "execution_count": 1, "outputs": [], "source": [ "from sagemaker.workflow.steps import ProcessingStep\n", From 67a098ba3754c1110b06c8fc9add1691760c0a40 Mon Sep 17 00:00:00 2001 From: knikitiuk <36886708+KristinaNikitiuk@users.noreply.github.com> Date: Fri, 24 Feb 2023 17:01:52 +0300 Subject: [PATCH 4/6] clean up --- mlops_utilities/actions.py | 5 ++--- mlops_utilities/notebook_helper.py | 1 + 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/mlops_utilities/actions.py b/mlops_utilities/actions.py index e3a33af..266b6f4 100644 --- a/mlops_utilities/actions.py +++ b/mlops_utilities/actions.py @@ -11,8 +11,7 @@ from sagemaker.model_monitor import DataCaptureConfig from sagemaker.workflow.pipeline_context import PipelineSession -import notebook_helper -from mlops_utilities import helpers +from mlops_utilities import helpers, notebook_helper logger = logging.getLogger(__name__) @@ -146,7 +145,7 @@ def upsert_notebook_pipeline( config_yml_path=nb_yml_config, processing=True, notebook_path=notebook_path, - image_uri=None + image_uri=image_uri ) pipeline = notebook_helper.create_pipeline( diff --git a/mlops_utilities/notebook_helper.py b/mlops_utilities/notebook_helper.py index 84f927e..e9b704b 100644 --- a/mlops_utilities/notebook_helper.py +++ b/mlops_utilities/notebook_helper.py @@ -1,3 +1,4 @@ +"""Jupyter notebook helper""" import json import os From 4973d778d6db8b365b5fd40f41c5fb7e363ffd3a Mon Sep 17 00:00:00 2001 From: knikitiuk <36886708+KristinaNikitiuk@users.noreply.github.com> Date: Tue, 28 Feb 2023 19:56:13 +0300 Subject: [PATCH 5/6] split helper into several --- .../notebook_helper/image_helper.py | 58 +++++++++++++++ .../notebook_helper/processing_helper.py | 70 ++++++++++++++++++ .../notebook_helper/training_helper.py | 73 +++++++++++++++++++ 3 files changed, 201 insertions(+) create mode 100644 mlops_utilities/notebook_helper/image_helper.py create mode 100644 mlops_utilities/notebook_helper/processing_helper.py create mode 100644 mlops_utilities/notebook_helper/training_helper.py diff --git a/mlops_utilities/notebook_helper/image_helper.py b/mlops_utilities/notebook_helper/image_helper.py new file mode 100644 index 0000000..8e91265 --- /dev/null +++ b/mlops_utilities/notebook_helper/image_helper.py @@ -0,0 +1,58 @@ +"""image creation step""" +import subprocess + + +class ImageHelper: + + def __init__(self, local_image_name: str, role: str, account_id: str, region: str): + + self.img_name = local_image_name + self.role = role + self.account_id = account_id + self.region = region + + def _run_shell_cmd(self, cmd: str, error_msg: str): + """ + + Args: + cmd: terminal command + error_msg: error message + """ + try: + subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) + except subprocess.CalledProcessError as exc: + raise ImageHelperError(f'ImageHelper: {error_msg}') from exc + + def tag_image(self): + """ + assign tag to local image, usually looks like that .dkr.ecr..amazonaws.com/: + """ + self._run_shell_cmd(cmd=f"docker tag {self.img_name} " + f"{self.account_id}.dkr.ecr.us-east-1.amazonaws.com/{self.img_name}:{self.img_name}", + error_msg=f'Failed to tag local image') + + def create_repository(self): + """ + login to ecr repository or create if not exists + """ + self._run_shell_cmd(cmd=f"aws ecr get-login-password --region {self.region} | docker login --username AWS --password-stdin " + f"{self.account_id}.dkr.ecr.{self.region}.amazonaws.com/{self.img_name}", + error_msg='Failed to create or login to repository') + + def push_docker_image(self): + """ + push docker image to ecr + """ + self._run_shell_cmd(cmd=f"docker push {self.account_id}.dkr.ecr.{self.region}.amazonaws.com/{self.img_name}:{self.img_name}", + error_msg='Failed to push local image to ecr') + + def create_sagemaker_image(self): + """ + create sagemaker image from ecr repository + """ + self._run_shell_cmd(cmd=f"aws sagemaker create-image --image-name {self.img_name} --role-arn {self.role}", + error_msg='Failed to create sagemaker image') + + +class ImageHelperError(Exception): + pass diff --git a/mlops_utilities/notebook_helper/processing_helper.py b/mlops_utilities/notebook_helper/processing_helper.py new file mode 100644 index 0000000..967220a --- /dev/null +++ b/mlops_utilities/notebook_helper/processing_helper.py @@ -0,0 +1,70 @@ +"""processing step helper""" +import os + +from omegaconf import OmegaConf +from sagemaker import Session +from sagemaker.processing import FrameworkProcessor, ProcessingInput, ProcessingOutput +from sagemaker.sklearn import SKLearn +from sagemaker.workflow.steps import ProcessingStep + +PROCESSING_CONTAINER_DIR = "/opt/ml/processing" + + +class ProcessingHelper: + + def __init__(self, processing_step_name: str, sagemaker_session: Session, notebook_path: str, role: str, + nb_config_path: str): + self.processing_step_name = processing_step_name + self.sagemaker_session = sagemaker_session + self.notebook_path = notebook_path + self.role = role + self.nb_config_path = nb_config_path + + def _load_nb_config(self): + """ + + Args: + local path of notebook yml configs + Returns: + loaded yml configs + """ + return OmegaConf.load(self.nb_config_path) + + def _create_processor(self) -> FrameworkProcessor: + """ + Returns: + processor framework + """ + nb_config = self._load_nb_config() + return FrameworkProcessor( + estimator_cls=SKLearn, + framework_version="0.23-1", + role=self.role, + instance_count=nb_config.training.instance_count, + instance_type=nb_config.training.instance_type, + sagemaker_session=self.sagemaker_session, + ) + + def create_processing_step(self) -> ProcessingStep: + """ + Returns: + sagemaker processing job + """ + return ProcessingStep( + self.processing_step_name, + processor=self._create_processor(), + inputs=[ + ProcessingInput( + input_name="code", + source=self.notebook_path, + destination=os.path.join(PROCESSING_CONTAINER_DIR, "code"), + ), + ], + outputs=[ + ProcessingOutput( + output_name="output-data", + source=os.path.join(PROCESSING_CONTAINER_DIR, "output-data"), + ) + ], + code=os.path.join(self.notebook_path, "entrypoint.sh") + ) diff --git a/mlops_utilities/notebook_helper/training_helper.py b/mlops_utilities/notebook_helper/training_helper.py new file mode 100644 index 0000000..541720d --- /dev/null +++ b/mlops_utilities/notebook_helper/training_helper.py @@ -0,0 +1,73 @@ +"""training step helper""" +import json + +from omegaconf import OmegaConf +from sagemaker import Session, TrainingInput +from sagemaker.estimator import Estimator +from sagemaker.workflow.steps import TrainingStep + + +class TrainingHelper: + + def __init__(self, train_step_name: str, sagemaker_session: Session, image_uri: str, input_data_uri: str, + validation_data_uri: str, role: str, nb_config_path: str, hyperparams_file: str = None): + + self.train_step_name = train_step_name + self.sm_session = sagemaker_session + self.image_uri = image_uri + self.input_data_uri = input_data_uri + self.validation_data_uri = validation_data_uri + self.role = role + self.nb_config_path = nb_config_path + self.hyperparams_file = hyperparams_file + + def _load_nb_config(self): + """ + + Args: + local path of notebook yml configs + Returns: + loaded yml configs + """ + return OmegaConf.load(self.nb_config_path) + + def create_estimator(self) -> Estimator: + """ + Returns: + estimator for training job + """ + nb_config = self._load_nb_config() + if self.hyperparams_file: + with open(self.hyperparams_file, encoding='utf-8') as json_file: + hyperparams_dict = json.load(json_file) + + return Estimator( + image_uri=self.image_uri, + instance_type=nb_config.processing.instance_type, + instance_count=nb_config.processing.instance_count, + base_job_name="notebook-train", + sagemaker_session=self.sm_session, + role=self.role, + hyperparameters=hyperparams_dict + ) + + def create_training_step(self) -> TrainingStep: + """ + Returns: + training step + """ + estimator = self.create_estimator() + return TrainingStep( + name=self.train_step_name, + estimator=estimator, + inputs={ + "train": TrainingInput( + s3_data=self.input_data_uri, + content_type="text/csv", + ), + "validation": TrainingInput( + s3_data=self.validation_data_uri, + content_type="text/csv", + ), + }, + ) From abf769af6aa3f1c061f2368f7351e7f790acd10a Mon Sep 17 00:00:00 2001 From: knikitiuk <36886708+KristinaNikitiuk@users.noreply.github.com> Date: Wed, 1 Mar 2023 19:15:10 +0300 Subject: [PATCH 6/6] image helper update --- mlops_utilities/actions.py | 8 +- mlops_utilities/helpers.py | 69 ++++++ mlops_utilities/notebook_helper.py | 209 ------------------ .../notebook_helper/image_helper.py | 28 ++- 4 files changed, 92 insertions(+), 222 deletions(-) delete mode 100644 mlops_utilities/notebook_helper.py diff --git a/mlops_utilities/actions.py b/mlops_utilities/actions.py index 266b6f4..ec56a18 100644 --- a/mlops_utilities/actions.py +++ b/mlops_utilities/actions.py @@ -11,7 +11,7 @@ from sagemaker.model_monitor import DataCaptureConfig from sagemaker.workflow.pipeline_context import PipelineSession -from mlops_utilities import helpers, notebook_helper +from mlops_utilities import helpers logger = logging.getLogger(__name__) @@ -139,16 +139,16 @@ def upsert_notebook_pipeline( sm_session = Session(default_bucket='kris-mlops-utilities-test') - pipeline_steps = notebook_helper.compose_pipeline( + pipeline_steps = helpers.compose_pipeline( sm_session=sm_session, role=role, config_yml_path=nb_yml_config, - processing=True, + processing_step_name='ProcessingStep', notebook_path=notebook_path, image_uri=image_uri ) - pipeline = notebook_helper.create_pipeline( + pipeline = helpers.create_pipeline( pipeline_name=pipeline_name, sm_session=sm_session, steps=pipeline_steps, diff --git a/mlops_utilities/helpers.py b/mlops_utilities/helpers.py index 8fd0ff0..4ee12ea 100644 --- a/mlops_utilities/helpers.py +++ b/mlops_utilities/helpers.py @@ -12,6 +12,11 @@ from omegaconf import OmegaConf, dictconfig # Sagemaker dependent methods +from sagemaker import Session +from sagemaker.workflow.pipeline import Pipeline + +from mlops_utilities.notebook_helper.processing_helper import ProcessingHelper +from mlops_utilities.notebook_helper.training_helper import TrainingHelper logger = logging.getLogger(__name__) @@ -270,3 +275,67 @@ def _generate_data_capture_config( ], # both by default "CaptureContentTypeHeader": {"CsvContentTypes": ["text/csv"]}, } + + +def create_pipeline(pipeline_name: str, sm_session: Session, steps: list, pipeline_params: list) -> Pipeline: + """ + Create pipeline using list of steps, generated as a result of compose_pipeline function + Args: + pipeline_name: pipeline name + sm_session: sagemaker session + steps: list of composed steps from jupyter notebook + pipeline_params: pipeline params + + Returns: + sagemaker pipeline + """ + return Pipeline( + name=pipeline_name, + parameters=pipeline_params, + steps=steps, + sagemaker_session=sm_session, + ) + + +def compose_pipeline(sm_session: Session, role: str, config_yml_path: str, processing_step_name: str = None, + training_step_name: str = None, image_uri: str = None, notebook_path: str = None, + hyperparams_file=None) -> list: + """ + Compose list of pipeline steps. + To include processing/training step define processing/training_step_name, otherwise ignore ;) + Args: + sm_session: sagemaker session + role: role arn + config_yml_path: local path of notebook yml configs + processing_step_name: name of the processing step, IF none -> do not include processing step + training_step_name: name of the training step, IF none -> skip training step creation + image_uri: image uri of pushed image to sagemaker + notebook_path: local path of notebook yml configs + hyperparams_file: local path of hyperparameters file + + Returns: + list of composed steps + """ + pipeline_steps = [] + if processing_step_name: + processing_step = ProcessingHelper(processing_step_name=processing_step_name, + sagemaker_session=sm_session, + notebook_path=notebook_path, + role=role, + nb_config_path=config_yml_path).create_processing_step() + pipeline_steps.append(processing_step) + + if training_step_name: + training_step = TrainingHelper(train_step_name=training_step_name, + sagemaker_session=sm_session, + image_uri=image_uri, + input_data_uri=f's3://{sm_session.default_bucket()}/abalone_data/train', + validation_data_uri=f's3://{sm_session.default_bucket()}/abalone_data/test', + role=role, + nb_config_path=config_yml_path, + hyperparams_file=hyperparams_file).create_training_step() + + pipeline_steps.append(training_step) + + return pipeline_steps + diff --git a/mlops_utilities/notebook_helper.py b/mlops_utilities/notebook_helper.py deleted file mode 100644 index e9b704b..0000000 --- a/mlops_utilities/notebook_helper.py +++ /dev/null @@ -1,209 +0,0 @@ -"""Jupyter notebook helper""" -import json -import os - -from omegaconf import OmegaConf -from sagemaker import Session, TrainingInput -from sagemaker.estimator import Estimator -from sagemaker.processing import FrameworkProcessor, ProcessingInput, ProcessingOutput -from sagemaker.sklearn import SKLearn -from sagemaker.workflow.pipeline import Pipeline -from sagemaker.workflow.steps import ProcessingStep, TrainingStep - -PROCESSING_CONTAINER_DIR = "/opt/ml/processing" - - -def load_nb_config(nb_config_path: str): - """ - - Args: - nb_config_path: local path of notebook yml configs - - Returns: - loaded yml configs - """ - return OmegaConf.load(nb_config_path) - - -def create_processor(sm_session: Session, role: str, nb_config_path: str) -> FrameworkProcessor: - """ - - Args: - sm_session: sagemaker session - role: role arn - nb_config_path: local path of notebook yml configs - - Returns: - - """ - nb_config = load_nb_config(nb_config_path) - return FrameworkProcessor( - estimator_cls=SKLearn, - framework_version="0.23-1", - role=role, - instance_count=nb_config.training.instance_count, - instance_type=nb_config.training.instance_type, - sagemaker_session=sm_session, - ) - - -def create_processing_step(processing_step_name: str, sm_session: Session, notebook_path: str, - role: str, nb_config_path: str) -> ProcessingStep: - """ - - Args: - processing_step_name: processing step name - sm_session: sagemaker session - notebook_path: local path of jupyter notebook - role: role arn - nb_config_path: local path of notebook yml configs - - Returns: - sagemaker processing job - """ - return ProcessingStep( - processing_step_name, - processor=create_processor(sm_session, role, nb_config_path), - inputs=[ - ProcessingInput( - input_name="code", - source=notebook_path, - destination=os.path.join(PROCESSING_CONTAINER_DIR, "code"), - ), - ], - outputs=[ - ProcessingOutput( - output_name="output-data", - source=os.path.join(PROCESSING_CONTAINER_DIR, "output-data"), - ) - ], - code=os.path.join(notebook_path, "entrypoint.sh") - ) - - -def create_pipeline(pipeline_name: str, sm_session: Session, steps: list, pipeline_params: list) -> Pipeline: - """ - - Args: - pipeline_name: pipeline name - sm_session: sagemaker session - steps: list of composed steps from jupyter notebook - pipeline_params: pipeline params - - Returns: - sagemaker pipeline - """ - return Pipeline( - name=pipeline_name, - parameters=pipeline_params, - steps=steps, - sagemaker_session=sm_session, - ) - - -def create_estimator(sm_session: Session, image_uri, role: str, nb_config_path: str, hyperparams_file: str = None): - """ - - Args: - sm_session: sagemaker session - image_uri: uri of docker image pushed to sagemaker - role: role arn - nb_config_path: local path of notebook yml configs - hyperparams_file: file with hyperparameters for model - - Returns: - estimator for training job - """ - nb_config = load_nb_config(nb_config_path) - if hyperparams_file: - with open(hyperparams_file, encoding='utf-8') as json_file: - hyperparams_dict = json.load(json_file) - - return Estimator( - image_uri=image_uri, - instance_type=nb_config.processing.instance_type, - instance_count=nb_config.processing.instance_count, - base_job_name="notebook-train", - sagemaker_session=sm_session, - role=role, - hyperparameters=hyperparams_dict - ) - - -def create_training_step(train_step_name: str, sm_session: Session, image_uri: str, input_data_uri: str, - validation_data_uri: str, role: str, nb_config_path: str, hyperparams_file: str = None): - """ - - Args: - train_step_name: train step name - sm_session: sagemaker session - image_uri: image uri - input_data_uri: input data url - validation_data_uri: validation data url - role: role arn - nb_config_path: local path of notebook yml configs - hyperparams_file: local path of hyperparameters file - - Returns: - - """ - estimator = create_estimator(sm_session, image_uri, role, nb_config_path, hyperparams_file) - return TrainingStep( - name=train_step_name, - estimator=estimator, - inputs={ - "train": TrainingInput( - s3_data=input_data_uri, - content_type="text/csv", - ), - "validation": TrainingInput( - s3_data=validation_data_uri, - content_type="text/csv", - ), - }, - ) - - -def compose_pipeline(sm_session: Session, role: str, config_yml_path: str, processing: bool = False, - training: bool = False, image_uri: str = None, notebook_path: str = None, - hyperparams_file=None) -> list: - """ - - Args: - sm_session: sagemaker session - role: role arn - config_yml_path: local path of notebook yml configs - processing: true IF you want to include processing step - training: true IF you want to include training step - image_uri: image uri of pushed image to sagemaker - notebook_path: local path of notebook yml configs - hyperparams_file: local path of hyperparameters file - - Returns: - list of composed steps - """ - pipeline_steps = [] - if processing: - processing_step = create_processing_step( - processing_step_name='processing-nb-upsert', - sm_session=sm_session, - notebook_path=notebook_path, - role=role, - nb_config_path=config_yml_path - ) - pipeline_steps.append(processing_step) - - if training: - training_step = create_training_step( - train_step_name="training-nb-upsert", - sm_session=sm_session, - image_uri=image_uri, - input_data_uri='s3://kris-mlops-utilities-test/abalone_data/train', - validation_data_uri='s3://kris-mlops-utilities-test/abalone_data/test', - role=role, - nb_config_path=config_yml_path, - hyperparams_file=hyperparams_file - ) - pipeline_steps.append(training_step) - - return pipeline_steps diff --git a/mlops_utilities/notebook_helper/image_helper.py b/mlops_utilities/notebook_helper/image_helper.py index 8e91265..f7e88fd 100644 --- a/mlops_utilities/notebook_helper/image_helper.py +++ b/mlops_utilities/notebook_helper/image_helper.py @@ -27,24 +27,30 @@ def tag_image(self): """ assign tag to local image, usually looks like that .dkr.ecr..amazonaws.com/: """ - self._run_shell_cmd(cmd=f"docker tag {self.img_name} " - f"{self.account_id}.dkr.ecr.us-east-1.amazonaws.com/{self.img_name}:{self.img_name}", - error_msg=f'Failed to tag local image') + tagged_img = f'{self.account_id}.dkr.ecr.{self.region}.amazonaws.com/{self.img_name}:{self.img_name}' + self._run_shell_cmd(cmd=f"docker tag {self.img_name} {tagged_img}", error_msg=f'Failed to tag local image') + return tagged_img - def create_repository(self): + def crate_ecr_repository(self): """ - login to ecr repository or create if not exists + create ecr repository + """ + self._run_shell_cmd(cmd=f"aws ecr create-repository --repository-name {self.img_name}", + error_msg='Failed to create ecr repository') + + def login_ecr_repository(self): + """ + login to ecr repository """ self._run_shell_cmd(cmd=f"aws ecr get-login-password --region {self.region} | docker login --username AWS --password-stdin " f"{self.account_id}.dkr.ecr.{self.region}.amazonaws.com/{self.img_name}", - error_msg='Failed to create or login to repository') + error_msg='Failed to login ecr repository') - def push_docker_image(self): + def push_docker_image(self, tagged_img): """ push docker image to ecr """ - self._run_shell_cmd(cmd=f"docker push {self.account_id}.dkr.ecr.{self.region}.amazonaws.com/{self.img_name}:{self.img_name}", - error_msg='Failed to push local image to ecr') + self._run_shell_cmd(cmd=f"docker push {tagged_img}", error_msg='Failed to push local image to ecr') def create_sagemaker_image(self): """ @@ -53,6 +59,10 @@ def create_sagemaker_image(self): self._run_shell_cmd(cmd=f"aws sagemaker create-image --image-name {self.img_name} --role-arn {self.role}", error_msg='Failed to create sagemaker image') + def create_sagemaker_image_version(self, tagged_img): + self._run_shell_cmd(cmd=f"aws sagemaker create-image-version --base-image {tagged_img}" + f" --image-name {self.img_name}", error_msg='Failed to create image version') + class ImageHelperError(Exception): pass