diff --git a/.github/workflows/continuous_integration.yml b/.github/workflows/continuous_integration.yml index 36cbdae7..54281408 100644 --- a/.github/workflows/continuous_integration.yml +++ b/.github/workflows/continuous_integration.yml @@ -98,7 +98,7 @@ jobs: strategy: fail-fast: false matrix: - AG_VERSION: ["source", "0.7.0"] + AG_VERSION: ["source", "1.1.0"] needs: cloud_lint_check runs-on: ubuntu-latest steps: @@ -130,7 +130,7 @@ jobs: strategy: fail-fast: false matrix: - AG_VERSION: ["source", "0.7.0"] + AG_VERSION: ["source", "1.1.0"] needs: cloud_lint_check runs-on: ubuntu-latest steps: @@ -162,7 +162,7 @@ jobs: strategy: fail-fast: false matrix: - AG_VERSION: ["source", "0.7.0"] + AG_VERSION: ["source", "1.1.0"] needs: cloud_lint_check runs-on: ubuntu-latest steps: @@ -194,7 +194,7 @@ jobs: strategy: fail-fast: false matrix: - AG_VERSION: ["source", "0.7.0"] + AG_VERSION: ["source", "1.1.0"] needs: cloud_lint_check runs-on: ubuntu-latest steps: @@ -226,7 +226,7 @@ jobs: strategy: fail-fast: false matrix: - AG_VERSION: ["source", "0.7.0"] + AG_VERSION: ["source", "1.1.0"] needs: cloud_lint_check runs-on: ubuntu-latest steps: diff --git a/src/autogluon/cloud/backend/ray_backend.py b/src/autogluon/cloud/backend/ray_backend.py index 491c5a3b..acdd7efd 100644 --- a/src/autogluon/cloud/backend/ray_backend.py +++ b/src/autogluon/cloud/backend/ray_backend.py @@ -384,7 +384,7 @@ def _get_image_uri(self, framework_version: str, instance_type: str, custom_imag logger.log(20, f"Training with custom_image_uri=={custom_image_uri}") else: framework_version, py_version = parse_framework_version( - framework_version, "training", minimum_version="0.7.0" + framework_version, "training", minimum_version="1.0.0" ) logger.log(20, f"Training with framework_version=={framework_version}") image_uri = image_uris.retrieve( diff --git a/src/autogluon/cloud/backend/sagemaker_backend.py b/src/autogluon/cloud/backend/sagemaker_backend.py index 5bab5c38..b5a1e3a5 100644 --- a/src/autogluon/cloud/backend/sagemaker_backend.py +++ b/src/autogluon/cloud/backend/sagemaker_backend.py @@ -558,7 +558,6 @@ def predict_real_time( test_data_image_column: Optional[str] = None, accept: str = "application/x-parquet", inference_kwargs: Optional[Dict[str, Any]] = None, - **kwargs, ) -> Union[pd.DataFrame, pd.Series]: """ Predict with the deployed SageMaker endpoint. A deployed SageMaker endpoint is required. @@ -595,7 +594,6 @@ def predict_proba_real_time( test_data_image_column: Optional[str] = None, accept: str = "application/x-parquet", inference_kwargs: Optional[Dict[str, Any]] = None, - **kwargs, ) -> Union[pd.DataFrame, pd.Series]: """ Predict probability with the deployed SageMaker endpoint. A deployed SageMaker endpoint is required. @@ -704,6 +702,7 @@ def predict( instance_count: int = 1, custom_image_uri: Optional[str] = None, wait: bool = True, + inference_kwargs: Optional[Dict[str, Any]] = None, download: bool = True, persist: bool = True, save_path: Optional[str] = None, @@ -783,6 +782,7 @@ def predict( instance_count=instance_count, custom_image_uri=custom_image_uri, wait=wait, + inference_kwargs=inference_kwargs, download=download, persist=persist, save_path=save_path, @@ -805,6 +805,7 @@ def predict_proba( instance_count: int = 1, custom_image_uri: Optional[str] = None, wait: bool = True, + inference_kwargs: Optional[Dict[str, Any]] = None, download: bool = True, persist: bool = True, save_path: Optional[str] = None, @@ -889,6 +890,7 @@ def predict_proba( instance_count=instance_count, custom_image_uri=custom_image_uri, wait=wait, + inference_kwargs=inference_kwargs, download=download, persist=persist, save_path=save_path, @@ -1133,6 +1135,7 @@ def _predict( instance_count=1, custom_image_uri=None, wait=True, + inference_kwargs=None, download=True, persist=True, save_path=None, @@ -1256,6 +1259,7 @@ def _predict( transformer_kwargs=transformer_kwargs, model_kwargs=model_kwargs, repack_model=repack_model, + inference_kwargs=inference_kwargs, **transform_kwargs, ) self._batch_transform_jobs[job_name] = batch_transform_job diff --git a/src/autogluon/cloud/backend/timeseries_sagemaker_backend.py b/src/autogluon/cloud/backend/timeseries_sagemaker_backend.py index 3bee7398..2339e9fb 100644 --- a/src/autogluon/cloud/backend/timeseries_sagemaker_backend.py +++ b/src/autogluon/cloud/backend/timeseries_sagemaker_backend.py @@ -15,9 +15,9 @@ class TimeSeriesSagemakerBackend(SagemakerBackend): def _preprocess_data( self, data: Union[pd.DataFrame, str], - id_column: str, - timestamp_column: str, - target: str, + id_column: Optional[str] = None, + timestamp_column: Optional[str] = None, + target: Optional[str] = None, static_features: Optional[Union[pd.DataFrame, str]] = None, ) -> pd.DataFrame: if isinstance(data, str): @@ -27,12 +27,15 @@ def _preprocess_data( cols = data.columns.to_list() # Make sure id and timestamp columns are the first two columns, and target column is in the end # This is to ensure in the container we know how to find id and timestamp columns, and whether there are static features being merged - timestamp_index = cols.index(timestamp_column) - cols.insert(0, cols.pop(timestamp_index)) - id_index = cols.index(id_column) - cols.insert(0, cols.pop(id_index)) - target_index = cols.index(target) - cols.append(cols.pop(target_index)) + if timestamp_column is not None: + timestamp_index = cols.index(timestamp_column) + cols.insert(0, cols.pop(timestamp_index)) + if id_column is not None: + id_index = cols.index(id_column) + cols.insert(0, cols.pop(id_index)) + if target is not None: + target_index = cols.index(target) + cols.append(cols.pop(target_index)) data = data[cols] if static_features is not None: @@ -48,8 +51,8 @@ def fit( *, predictor_init_args: Dict[str, Any], predictor_fit_args: Dict[str, Any], - id_column: str, - timestamp_column: str, + id_column: Optional[str] = None, + timestamp_column: Optional[str] = None, static_features: Optional[Union[str, pd.DataFrame]] = None, framework_version: str = "latest", job_name: Optional[str] = None, @@ -199,9 +202,9 @@ def predict_proba_real_time(self, **kwargs) -> pd.DataFrame: def predict( self, test_data: Union[str, pd.DataFrame], - id_column: str, - timestamp_column: str, - target: str, + id_column: Optional[str] = None, + timestamp_column: Optional[str] = None, + target: Optional[str] = None, static_features: Optional[Union[str, pd.DataFrame]] = None, **kwargs, ) -> Optional[pd.DataFrame]: diff --git a/src/autogluon/cloud/default_cluster_configs/ray_aws_default_cluster_config.yaml b/src/autogluon/cloud/default_cluster_configs/ray_aws_default_cluster_config.yaml index e0ea39f7..d64b784a 100644 --- a/src/autogluon/cloud/default_cluster_configs/ray_aws_default_cluster_config.yaml +++ b/src/autogluon/cloud/default_cluster_configs/ray_aws_default_cluster_config.yaml @@ -6,7 +6,7 @@ max_workers: 1 docker: # The image uri will be dynamically replaced by cloud predictor - image: "763104351884.dkr.ecr.us-east-1.amazonaws.com/autogluon-training:0.7.0-cpu-py39-ubuntu20.04" + image: "763104351884.dkr.ecr.us-east-1.amazonaws.com/autogluon-training:1.0.0-cpu-py39-ubuntu20.04" container_name: "ag_dlc" # Cloud-provider specific configuration. diff --git a/src/autogluon/cloud/job/sagemaker_job.py b/src/autogluon/cloud/job/sagemaker_job.py index edc09aed..bc88965e 100644 --- a/src/autogluon/cloud/job/sagemaker_job.py +++ b/src/autogluon/cloud/job/sagemaker_job.py @@ -10,6 +10,7 @@ AutoGluonSagemakerEstimator, ) from ..utils.constants import LOCAL_MODE, LOCAL_MODE_GPU, MODEL_ARTIFACT_NAME +from ..utils.utils import serialize_kwargs from .remote_job import RemoteJob logger = logging.getLogger(__name__) @@ -257,6 +258,7 @@ def run( model_kwargs, transformer_kwargs, repack_model=False, + inference_kwargs=None, **kwargs, ): self._local_mode = instance_type in (LOCAL_MODE, LOCAL_MODE_GPU) @@ -265,6 +267,10 @@ def run( else: model_cls = AutoGluonNonRepackInferenceModel logger.log(20, "Creating inference model...") + inference_kwargs_str = serialize_kwargs(inference_kwargs) if inference_kwargs is not None else None + env = {} + if len(inference_kwargs_str) > 0: + env["inference_kwargs"] = inference_kwargs_str model = model_cls( model_data=model_data, role=role, @@ -275,6 +281,7 @@ def run( custom_image_uri=custom_image_uri, entry_point=entry_point, predictor_cls=predictor_cls, + env=env, **model_kwargs, ) logger.log(20, "Inference model created successfully") diff --git a/src/autogluon/cloud/predictor/cloud_predictor.py b/src/autogluon/cloud/predictor/cloud_predictor.py index 55a749a2..8fc00181 100644 --- a/src/autogluon/cloud/predictor/cloud_predictor.py +++ b/src/autogluon/cloud/predictor/cloud_predictor.py @@ -541,7 +541,7 @@ def predict_proba_real_time( """ self._validate_inference_kwargs(inference_kwargs=kwargs) return self.backend.predict_proba_real_time( - test_data=test_data, test_data_image_column=test_data_image_column, accept=accept + test_data=test_data, test_data_image_column=test_data_image_column, accept=accept, inference_kwargs=kwargs ) def predict( @@ -556,6 +556,7 @@ def predict( custom_image_uri: Optional[str] = None, wait: bool = True, backend_kwargs: Optional[Dict] = None, + **kwargs, ) -> Optional[pd.Series]: """ Batch inference. @@ -632,6 +633,7 @@ def predict( instance_count=instance_count, custom_image_uri=custom_image_uri, wait=wait, + inference_kwargs=kwargs, **backend_kwargs, ) @@ -648,6 +650,7 @@ def predict_proba( custom_image_uri: Optional[str] = None, wait: bool = True, backend_kwargs: Optional[Dict] = None, + **kwargs, ) -> Optional[Union[Tuple[pd.Series, Union[pd.DataFrame, pd.Series]], Union[pd.DataFrame, pd.Series]]]: """ Batch inference @@ -730,6 +733,7 @@ def predict_proba( instance_count=instance_count, custom_image_uri=custom_image_uri, wait=wait, + inference_kwargs=kwargs, **backend_kwargs, ) diff --git a/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py index e5018e94..449f7b48 100644 --- a/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py +++ b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py @@ -50,8 +50,8 @@ def fit( *, predictor_init_args: Dict[str, Any], predictor_fit_args: Dict[str, Any], - id_column: str = "item_id", - timestamp_column: str = "timestamp", + id_column: Optional[str] = None, + timestamp_column: Optional[str] = None, static_features: Optional[Union[str, pd.DataFrame]] = None, framework_version: str = "latest", job_name: Optional[str] = None, @@ -120,7 +120,7 @@ def fit( if backend_kwargs is None: backend_kwargs = {} - self.target_column = predictor_init_args.get("target", "target") + self.target_column = predictor_init_args.get("target") self.id_column = id_column self.timestamp_column = timestamp_column @@ -146,6 +146,9 @@ def fit( def predict_real_time( self, test_data: Union[str, pd.DataFrame], + id_column: Optional[str] = None, + timestamp_column: Optional[str] = None, + target: Optional[str] = None, static_features: Optional[Union[str, pd.DataFrame]] = None, accept: str = "application/x-parquet", **kwargs, @@ -175,6 +178,10 @@ def predict_real_time( Pandas.DataFrame Predict results in DataFrame """ + self.id_column = id_column or self.id_column + self.timestamp_column = timestamp_column or self.timestamp_column + self.target_column = target or self.target_column + return self.backend.predict_real_time( test_data=test_data, id_column=self.id_column, @@ -182,6 +189,7 @@ def predict_real_time( target=self.target_column, static_features=static_features, accept=accept, + inference_kwargs=kwargs, ) def predict_proba_real_time(self, **kwargs) -> pd.DataFrame: @@ -190,6 +198,9 @@ def predict_proba_real_time(self, **kwargs) -> pd.DataFrame: def predict( self, test_data: Union[str, pd.DataFrame], + id_column: Optional[str] = None, + timestamp_column: Optional[str] = None, + target: Optional[str] = None, static_features: Optional[Union[str, pd.DataFrame]] = None, predictor_path: Optional[str] = None, framework_version: str = "latest", @@ -199,6 +210,7 @@ def predict( custom_image_uri: Optional[str] = None, wait: bool = True, backend_kwargs: Optional[Dict] = None, + **kwargs, ) -> Optional[pd.DataFrame]: """ Predict using SageMaker batch transform. @@ -263,6 +275,10 @@ def predict( Please refer to https://sagemaker.readthedocs.io/en/stable/api/inference/transformer.html#sagemaker.transformer.Transformer.transform for all options. """ + self.id_column = id_column or self.id_column + self.timestamp_column = timestamp_column or self.timestamp_column + self.target_column = target or self.target_column + if backend_kwargs is None: backend_kwargs = {} backend_kwargs = self.backend.parse_backend_predict_kwargs(backend_kwargs) @@ -279,6 +295,7 @@ def predict( instance_count=instance_count, custom_image_uri=custom_image_uri, wait=wait, + inference_kwargs=kwargs, **backend_kwargs, ) diff --git a/src/autogluon/cloud/scripts/sagemaker_scripts/tabular_serve.py b/src/autogluon/cloud/scripts/sagemaker_scripts/tabular_serve.py index ecff56b2..ab97efd7 100644 --- a/src/autogluon/cloud/scripts/sagemaker_scripts/tabular_serve.py +++ b/src/autogluon/cloud/scripts/sagemaker_scripts/tabular_serve.py @@ -1,8 +1,11 @@ # flake8: noqa import base64 import hashlib +import json +import logging import os import pickle +import sys from io import BytesIO, StringIO import pandas as pd @@ -13,6 +16,8 @@ from autogluon.tabular import TabularPredictor image_dir = os.path.join("/tmp", "ag_images") +logging.basicConfig(stream=sys.stdout, level=logging.INFO) +logger = logging.getLogger(__name__) def _save_image_and_update_dataframe_column(bytes): @@ -29,17 +34,48 @@ def _save_image_and_update_dataframe_column(bytes): return im_path +def _custom_json_deserializer(serialized_str): + """ + Deserialize the JSON string that may include representations of complex data types like DataFrames + """ + base_dict = json.loads(serialized_str) + + deserialized_kwargs = {} + for key, value in base_dict.items(): + if isinstance(value, str): + try: + value = json.loads(value) + if isinstance(value, list) and all(isinstance(item, dict) for item in value): + deserialized_kwargs[key] = pd.DataFrame(value) + else: + deserialized_kwargs[key] = value + except json.JSONDecodeError: + deserialized_kwargs[key] = value + else: + deserialized_kwargs[key] = value + + return deserialized_kwargs + + def model_fn(model_dir): """loads model from previously saved artifact""" - model = TabularPredictor.load(model_dir) - model.persist_models() - globals()["column_names"] = model.original_features + logger.info("Loading the model") + try: + model = TabularPredictor.load(model_dir) + model.persist_models() + globals()["column_names"] = model.original_features + except Exception as e: + logger.error(f"Error loading the model: {str(e)}") + raise e return model def transform_fn(model, request_body, input_content_type, output_content_type="application/json"): - inference_kwargs = {} + inference_kwargs = os.environ.get("inference_kwargs", {}) + if inference_kwargs: + inference_kwargs = _custom_json_deserializer(inference_kwargs) + if input_content_type == "application/x-parquet": buf = BytesIO(request_body) data = pd.read_parquet(buf) @@ -60,9 +96,7 @@ def transform_fn(model, request_body, input_content_type, output_content_type="a buf = bytes(request_body) payload = pickle.loads(buf) data = pd.read_parquet(BytesIO(payload["data"])) - inference_kwargs = payload["inference_kwargs"] - if inference_kwargs is None: - inference_kwargs = {} + inference_kwargs = payload.get("inference_kwargs", {}) else: raise ValueError(f"{input_content_type} input content type not supported.") diff --git a/src/autogluon/cloud/scripts/sagemaker_scripts/timeseries_serve.py b/src/autogluon/cloud/scripts/sagemaker_scripts/timeseries_serve.py index ec0fb311..b6b912c1 100644 --- a/src/autogluon/cloud/scripts/sagemaker_scripts/timeseries_serve.py +++ b/src/autogluon/cloud/scripts/sagemaker_scripts/timeseries_serve.py @@ -1,13 +1,18 @@ # flake8: noqa +import logging import os import pickle import shutil +import sys from io import BytesIO, StringIO import pandas as pd from autogluon.timeseries import TimeSeriesDataFrame, TimeSeriesPredictor +logging.basicConfig(stream=sys.stdout, level=logging.INFO) +logger = logging.getLogger(__name__) + def model_fn(model_dir): """loads model from previously saved artifact""" @@ -31,12 +36,15 @@ def model_fn(model_dir): def prepare_timeseries_dataframe(df, predictor): target = predictor.target cols = df.columns.to_list() + logger.info(f"COLUMN {cols}") id_column = cols[0] timestamp_column = cols[1] df[timestamp_column] = pd.to_datetime(df[timestamp_column]) static_features = None if target != cols[-1]: # target is not the last column, then there are static features being merged in + logger.info(f"Inside condition: {cols}, {target}") + logger.info(f"Inside condition: {cols}, {target}") target_index = cols.index(target) static_columns = cols[target_index + 1 :] static_features = df[[id_column] + static_columns].groupby([id_column], sort=False).head(1) @@ -56,6 +64,7 @@ def transform_fn(model, request_body, input_content_type, output_content_type="a elif input_content_type == "text/csv": buf = StringIO(request_body) + logger.info(f"request body data path: {buf}") data = pd.read_csv(buf) elif input_content_type == "application/json": @@ -77,6 +86,8 @@ def transform_fn(model, request_body, input_content_type, output_content_type="a else: raise ValueError(f"{input_content_type} input content type not supported.") + logger.info(f"Model is: {model}") + logger.info(f"Columns are: {data.columns}") data = prepare_timeseries_dataframe(data, model) prediction = model.predict(data, **inference_kwargs) prediction = pd.DataFrame(prediction) diff --git a/src/autogluon/cloud/scripts/sagemaker_scripts/train.py b/src/autogluon/cloud/scripts/sagemaker_scripts/train.py index 54ae9bef..28a57a14 100644 --- a/src/autogluon/cloud/scripts/sagemaker_scripts/train.py +++ b/src/autogluon/cloud/scripts/sagemaker_scripts/train.py @@ -31,13 +31,13 @@ def get_env_if_present(name): def prepare_timeseries_dataframe(df, predictor_init_args): - target = predictor_init_args["target"] + target = predictor_init_args.get("target") cols = df.columns.to_list() id_column = cols[0] timestamp_column = cols[1] df[timestamp_column] = pd.to_datetime(df[timestamp_column]) static_features = None - if target != cols[-1]: + if target is not None and target != cols[-1]: # target is not the last column, then there are static features being merged in target_index = cols.index(target) static_columns = cols[target_index + 1 :] @@ -46,7 +46,6 @@ def prepare_timeseries_dataframe(df, predictor_init_args): df.drop(columns=static_columns, inplace=True) df = TimeSeriesDataFrame.from_data_frame(df, id_column=id_column, timestamp_column=timestamp_column) if static_features is not None: - print(static_features) df.static_features = static_features return df diff --git a/src/autogluon/cloud/utils/utils.py b/src/autogluon/cloud/utils/utils.py index 6ace07b1..ea12fa7b 100644 --- a/src/autogluon/cloud/utils/utils.py +++ b/src/autogluon/cloud/utils/utils.py @@ -1,5 +1,6 @@ import base64 import copy +import json import logging import os import shutil @@ -7,6 +8,7 @@ import zipfile from datetime import datetime, timezone +import pandas as pd import PIL from PIL import Image @@ -91,3 +93,20 @@ class <=50K_proba >50K_proba def get_utc_timestamp_now(): return datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") + + +def _custom_json_serializer(obj): + print(obj) + if isinstance(obj, pd.DataFrame): + return obj.to_dict(orient="records") + raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable") + + +def serialize_kwargs(kwargs): + serialized_kwargs = {} + for key, value in kwargs.items(): + try: + serialized_kwargs[key] = json.dumps(value, default=_custom_json_serializer) + except TypeError as e: + logger.error(f"Error serializing {key}: {e}") + return json.dumps(serialized_kwargs)