Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

notebook_helper + upsert nb pipeline #18

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions mlops_utilities/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,48 @@ def run_pipeline(
return sagemaker_client.start_pipeline_execution(**start_pipe_args)


def upsert_notebook_pipeline(
pipeline_name: str,
notebook_path: str,
role: str,
nb_yml_config: str,
pipeline_tags: Optional[Dict[str, str]] = None,
image_uri: Optional[str] = None,
dryrun: bool = False,
):
"""
Local file will be uploaded to S3 using default bucket (configured)
Args:
notebook_path: local path to *.ipynb file
pipeline_name: see existing `upsert_pipeline` method
image_uri: ECR image URI that is built and pushed by the project CI
pipeline_tags: see existing `upsert_pipeline` method
dryrun: see existing `upsert_pipeline` method
"""

sm_session = Session(default_bucket='kris-mlops-utilities-test')

pipeline_steps = helpers.compose_pipeline(
sm_session=sm_session,
role=role,
config_yml_path=nb_yml_config,
processing_step_name='ProcessingStep',
notebook_path=notebook_path,
image_uri=image_uri
)

pipeline = helpers.create_pipeline(
pipeline_name=pipeline_name,
sm_session=sm_session,
steps=pipeline_steps,
pipeline_params=[]
)
if not dryrun:
if pipeline_tags is not None:
pipeline_tags = helpers.convert_param_dict_to_key_value_list(pipeline_tags)
pipeline.upsert(role_arn=role, tags=pipeline_tags)


def deploy_model(
sagemaker_session: Session,
model_package_group_name: str,
Expand Down
69 changes: 69 additions & 0 deletions mlops_utilities/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
from omegaconf import OmegaConf, dictconfig

# Sagemaker dependent methods
from sagemaker import Session
from sagemaker.workflow.pipeline import Pipeline

from mlops_utilities.notebook_helper.processing_helper import ProcessingHelper
from mlops_utilities.notebook_helper.training_helper import TrainingHelper

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -270,3 +275,67 @@ def _generate_data_capture_config(
], # both by default
"CaptureContentTypeHeader": {"CsvContentTypes": ["text/csv"]},
}


def create_pipeline(pipeline_name: str, sm_session: Session, steps: list, pipeline_params: list) -> Pipeline:
"""
Create pipeline using list of steps, generated as a result of compose_pipeline function
Args:
pipeline_name: pipeline name
sm_session: sagemaker session
steps: list of composed steps from jupyter notebook
pipeline_params: pipeline params

Returns:
sagemaker pipeline
"""
return Pipeline(
name=pipeline_name,
parameters=pipeline_params,
steps=steps,
sagemaker_session=sm_session,
)


def compose_pipeline(sm_session: Session, role: str, config_yml_path: str, processing_step_name: str = None,
training_step_name: str = None, image_uri: str = None, notebook_path: str = None,
hyperparams_file=None) -> list:
"""
Compose list of pipeline steps.
To include processing/training step define processing/training_step_name, otherwise ignore ;)
Args:
sm_session: sagemaker session
role: role arn
config_yml_path: local path of notebook yml configs
processing_step_name: name of the processing step, IF none -> do not include processing step
training_step_name: name of the training step, IF none -> skip training step creation
image_uri: image uri of pushed image to sagemaker
notebook_path: local path of notebook yml configs
hyperparams_file: local path of hyperparameters file

Returns:
list of composed steps
"""
pipeline_steps = []
if processing_step_name:
processing_step = ProcessingHelper(processing_step_name=processing_step_name,
sagemaker_session=sm_session,
notebook_path=notebook_path,
role=role,
nb_config_path=config_yml_path).create_processing_step()
pipeline_steps.append(processing_step)

if training_step_name:
training_step = TrainingHelper(train_step_name=training_step_name,
sagemaker_session=sm_session,
image_uri=image_uri,
input_data_uri=f's3://{sm_session.default_bucket()}/abalone_data/train',
validation_data_uri=f's3://{sm_session.default_bucket()}/abalone_data/test',
role=role,
nb_config_path=config_yml_path,
hyperparams_file=hyperparams_file).create_training_step()

pipeline_steps.append(training_step)

return pipeline_steps

68 changes: 68 additions & 0 deletions mlops_utilities/notebook_helper/image_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""image creation step"""
import subprocess


class ImageHelper:

def __init__(self, local_image_name: str, role: str, account_id: str, region: str):

self.img_name = local_image_name
self.role = role
self.account_id = account_id
self.region = region

def _run_shell_cmd(self, cmd: str, error_msg: str):
"""

Args:
cmd: terminal command
error_msg: error message
"""
try:
subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
except subprocess.CalledProcessError as exc:
raise ImageHelperError(f'ImageHelper: {error_msg}') from exc

def tag_image(self):
"""
assign tag to local image, usually looks like that <account_id>.dkr.ecr.<region>.amazonaws.com/<img>:<img>
"""
tagged_img = f'{self.account_id}.dkr.ecr.{self.region}.amazonaws.com/{self.img_name}:{self.img_name}'
self._run_shell_cmd(cmd=f"docker tag {self.img_name} {tagged_img}", error_msg=f'Failed to tag local image')
return tagged_img

def crate_ecr_repository(self):
"""
create ecr repository
"""
self._run_shell_cmd(cmd=f"aws ecr create-repository --repository-name {self.img_name}",
error_msg='Failed to create ecr repository')

def login_ecr_repository(self):
"""
login to ecr repository
"""
self._run_shell_cmd(cmd=f"aws ecr get-login-password --region {self.region} | docker login --username AWS --password-stdin "
f"{self.account_id}.dkr.ecr.{self.region}.amazonaws.com/{self.img_name}",
error_msg='Failed to login ecr repository')

def push_docker_image(self, tagged_img):
"""
push docker image to ecr
"""
self._run_shell_cmd(cmd=f"docker push {tagged_img}", error_msg='Failed to push local image to ecr')

def create_sagemaker_image(self):
"""
create sagemaker image from ecr repository
"""
self._run_shell_cmd(cmd=f"aws sagemaker create-image --image-name {self.img_name} --role-arn {self.role}",
error_msg='Failed to create sagemaker image')

def create_sagemaker_image_version(self, tagged_img):
self._run_shell_cmd(cmd=f"aws sagemaker create-image-version --base-image {tagged_img}"
f" --image-name {self.img_name}", error_msg='Failed to create image version')


class ImageHelperError(Exception):
pass
70 changes: 70 additions & 0 deletions mlops_utilities/notebook_helper/processing_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""processing step helper"""
import os

from omegaconf import OmegaConf
from sagemaker import Session
from sagemaker.processing import FrameworkProcessor, ProcessingInput, ProcessingOutput
from sagemaker.sklearn import SKLearn
from sagemaker.workflow.steps import ProcessingStep

PROCESSING_CONTAINER_DIR = "/opt/ml/processing"


class ProcessingHelper:

def __init__(self, processing_step_name: str, sagemaker_session: Session, notebook_path: str, role: str,
nb_config_path: str):
self.processing_step_name = processing_step_name
self.sagemaker_session = sagemaker_session
self.notebook_path = notebook_path
self.role = role
self.nb_config_path = nb_config_path

def _load_nb_config(self):
"""

Args:
local path of notebook yml configs
Returns:
loaded yml configs
"""
return OmegaConf.load(self.nb_config_path)

def _create_processor(self) -> FrameworkProcessor:
"""
Returns:
processor framework
"""
nb_config = self._load_nb_config()
return FrameworkProcessor(
estimator_cls=SKLearn,
framework_version="0.23-1",
role=self.role,
instance_count=nb_config.training.instance_count,
instance_type=nb_config.training.instance_type,
sagemaker_session=self.sagemaker_session,
)

def create_processing_step(self) -> ProcessingStep:
"""
Returns:
sagemaker processing job
"""
return ProcessingStep(
self.processing_step_name,
processor=self._create_processor(),
inputs=[
ProcessingInput(
input_name="code",
source=self.notebook_path,
destination=os.path.join(PROCESSING_CONTAINER_DIR, "code"),
),
],
outputs=[
ProcessingOutput(
output_name="output-data",
source=os.path.join(PROCESSING_CONTAINER_DIR, "output-data"),
)
],
code=os.path.join(self.notebook_path, "entrypoint.sh")
)
73 changes: 73 additions & 0 deletions mlops_utilities/notebook_helper/training_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""training step helper"""
import json

from omegaconf import OmegaConf
from sagemaker import Session, TrainingInput
from sagemaker.estimator import Estimator
from sagemaker.workflow.steps import TrainingStep


class TrainingHelper:

def __init__(self, train_step_name: str, sagemaker_session: Session, image_uri: str, input_data_uri: str,
validation_data_uri: str, role: str, nb_config_path: str, hyperparams_file: str = None):

self.train_step_name = train_step_name
self.sm_session = sagemaker_session
self.image_uri = image_uri
self.input_data_uri = input_data_uri
self.validation_data_uri = validation_data_uri
self.role = role
self.nb_config_path = nb_config_path
self.hyperparams_file = hyperparams_file

def _load_nb_config(self):
"""

Args:
local path of notebook yml configs
Returns:
loaded yml configs
"""
return OmegaConf.load(self.nb_config_path)

def create_estimator(self) -> Estimator:
"""
Returns:
estimator for training job
"""
nb_config = self._load_nb_config()
if self.hyperparams_file:
with open(self.hyperparams_file, encoding='utf-8') as json_file:
hyperparams_dict = json.load(json_file)

return Estimator(
image_uri=self.image_uri,
instance_type=nb_config.processing.instance_type,
instance_count=nb_config.processing.instance_count,
base_job_name="notebook-train",
sagemaker_session=self.sm_session,
role=self.role,
hyperparameters=hyperparams_dict
)

def create_training_step(self) -> TrainingStep:
"""
Returns:
training step
"""
estimator = self.create_estimator()
return TrainingStep(
name=self.train_step_name,
estimator=estimator,
inputs={
"train": TrainingInput(
s3_data=self.input_data_uri,
content_type="text/csv",
),
"validation": TrainingInput(
s3_data=self.validation_data_uri,
content_type="text/csv",
),
},
)
16 changes: 16 additions & 0 deletions mlops_utilities/notebook_test_processing_code/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

cd /opt/ml/processing/code/
# Exit on any error. SageMaker uses error code to mark failed job.
set -e
if [[ -f 'requirements.txt' ]]; then
# Some py3 containers has typing, which may breaks pip install
pip uninstall --yes typing
pip install -r requirements.txt
fi

pip install --upgrade pip ipython ipykernel
ipython kernel install --name "python3" --user

papermill processing_local_pipeline.ipynb output_processing.ipynb -p role_param arn:aws:iam::311638508164:role/AmazonSageMaker-ExecutionRole -p output_bucket_path kris-mlops-utilities-test
#papermill training_local_pipeline_updated.ipynb output_training.ipynb -p role_param arn:aws:iam::311638508164:role/AmazonSageMaker-ExecutionRole -p output_bucket_path kris-mlops-utilities-test
10 changes: 10 additions & 0 deletions mlops_utilities/notebook_test_processing_code/hyperparams.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"objective": "reg:linear",
"num_round": 50,
"max_depth": 5,
"eta":0.2,
"gamma":4,
"min_child_weight":6,
"subsample":0.7,
"silent":0
}
Loading