From 79bac68542d3fafafe9cea7dba4a788b5d0700ca Mon Sep 17 00:00:00 2001 From: Ignacio Quintero Date: Wed, 17 Jan 2018 17:27:50 -0800 Subject: [PATCH 1/6] Add support for async fit() when calling fit(wait=False) it will return immediately. The training job will carry on even if the process exits. by using attach() the estimator can be retrieved by providing the training job name. --- src/sagemaker/amazon/amazon_estimator.py | 14 ++++ src/sagemaker/estimator.py | 91 +++++++++++++----------- src/sagemaker/mxnet/estimator.py | 37 +--------- src/sagemaker/tensorflow/estimator.py | 40 ++--------- tests/integ/test_kmeans.py | 50 +++++++++++++ tests/integ/test_linear_learner.py | 69 ++++++++++++++++++ tests/integ/test_mxnet_train.py | 33 +++++++++ tests/integ/test_pca.py | 46 ++++++++++++ tests/integ/test_tf.py | 31 ++++++++ tox.ini | 6 +- 10 files changed, 305 insertions(+), 112 deletions(-) diff --git a/src/sagemaker/amazon/amazon_estimator.py b/src/sagemaker/amazon/amazon_estimator.py index 9fbdb8b631..4aca72c06e 100644 --- a/src/sagemaker/amazon/amazon_estimator.py +++ b/src/sagemaker/amazon/amazon_estimator.py @@ -64,6 +64,20 @@ def data_location(self, data_location): data_location = data_location + '/' self._data_location = data_location + @classmethod + def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_session): + + # The hyperparam names may not be the same as the class attribute that holds them, + # for instance: local_lloyd_init_method is called local_init_method. We need to map these + # and pass the correct name to the constructor. + + for attribute, value in cls.__dict__.items(): + if isinstance(value, hp): + if value.name in hyperparameters: + init_params[attribute] = hyperparameters[value.name] + + return cls(sagemaker_session=sagemaker_session, **init_params) + def fit(self, records, mini_batch_size=None, **kwargs): """Fit this Estimator on serialized Record objects, stored in S3. diff --git a/src/sagemaker/estimator.py b/src/sagemaker/estimator.py index 2bfce13f59..f0e43ba147 100644 --- a/src/sagemaker/estimator.py +++ b/src/sagemaker/estimator.py @@ -152,8 +152,47 @@ def fit(self, inputs, wait=True, logs=True, job_name=None): self.latest_training_job = _TrainingJob.start_new(self, inputs) if wait: self.latest_training_job.wait(logs=logs) + + + @classmethod + def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_session): + raise NotImplementedError() + + @classmethod + def attach(cls, training_job_name, sagemaker_session=None, **kwargs): + """Attach to an existing training job. + + Create an Estimator bound to an existing training job. After attaching, if + the training job has a Complete status, it can be ``deploy()`` ed to create + a SageMaker Endpoint and return a ``Predictor``. + + If the training job is in progress, attach will block and display log messages + from the training job, until the training job completes. + + Args: + training_job_name (str): The name of the training job to attach to. + sagemaker_session (sagemaker.session.Session): Session object which manages interactions with + Amazon SageMaker APIs and any other AWS services needed. If not specified, the estimator creates one + using the default AWS configuration chain. + **kwargs: Additional kwargs passed to the :class:`~sagemaker.estimator.Estimator` constructor. + + Returns: + sagemaker.estimator.Framework: ``Estimator`` with the attached training job. + """ + sagemaker_session = sagemaker_session or Session() + + if training_job_name is not None: + job_details = sagemaker_session.sagemaker_client.describe_training_job(TrainingJobName=training_job_name) + init_params, hp, image = cls._prepare_estimator_params_from_job_description(job_details) + else: - raise NotImplemented('Asynchronous fit not available') + raise ValueError('must specify training_job name') + + estimator = cls._from_training_job(init_params, hp, image, sagemaker_session) + estimator.latest_training_job = _TrainingJob(sagemaker_session=sagemaker_session, + training_job_name=init_params['base_job_name']) + estimator.latest_training_job.wait() + return estimator def deploy(self, initial_instance_count, instance_type, endpoint_name=None, **kwargs): """Deploy the trained model to an Amazon SageMaker endpoint and return a ``sagemaker.RealTimePredictor`` object. @@ -528,56 +567,24 @@ def hyperparameters(self): return self._json_encode_hyperparameters(self._hyperparameters) @classmethod - def attach(cls, training_job_name, sagemaker_session=None, **kwargs): - """Attach to an existing training job. - - Create an Estimator bound to an existing training job. After attaching, if - the training job has a Complete status, it can be ``deploy()`` ed to create - a SageMaker Endpoint and return a ``Predictor``. - - If the training job is in progress, attach will block and display log messages - from the training job, until the training job completes. - - Args: - training_job_name (str): The name of the training job to attach to. - sagemaker_session (sagemaker.session.Session): Session object which manages interactions with - Amazon SageMaker APIs and any other AWS services needed. If not specified, the estimator creates one - using the default AWS configuration chain. - **kwargs: Additional kwargs passed to the :class:`~sagemaker.estimator.Estimator` constructor. - - Returns: - sagemaker.estimator.Framework: ``Estimator`` with the attached training job. - """ - sagemaker_session = sagemaker_session or Session() - - if training_job_name is not None: - job_details = sagemaker_session.sagemaker_client.describe_training_job(TrainingJobName=training_job_name) - init_params, hp, _ = cls._prepare_estimator_params_from_job_description(job_details) - - else: - # this case is only valid when called from inheriting class and then the class must declare framework - if not hasattr(cls, '__framework_name__'): - raise ValueError('must specify training_job name') - init_params = dict(kwargs) - hp = init_params.pop('hyperparameters') + def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_session): # parameters for framework classes framework_init_params = dict() - framework_init_params['entry_point'] = json.loads(hp.get(SCRIPT_PARAM_NAME)) - framework_init_params['source_dir'] = json.loads(hp.get(DIR_PARAM_NAME)) - framework_init_params['enable_cloudwatch_metrics'] = json.loads(hp.get(CLOUDWATCH_METRICS_PARAM_NAME)) - framework_init_params['container_log_level'] = json.loads(hp.get(CONTAINER_LOG_LEVEL_PARAM_NAME)) + framework_init_params['entry_point'] = json.loads(hyperparameters.get(SCRIPT_PARAM_NAME)) + framework_init_params['source_dir'] = json.loads(hyperparameters.get(DIR_PARAM_NAME)) + framework_init_params['enable_cloudwatch_metrics'] = json.loads( + hyperparameters.get(CLOUDWATCH_METRICS_PARAM_NAME)) + framework_init_params['container_log_level'] = json.loads( + hyperparameters.get(CONTAINER_LOG_LEVEL_PARAM_NAME)) # drop json and remove other SageMaker specific additions - hyperparameters = {entry: json.loads(hp[entry]) for entry in hp} - framework_init_params['hyperparameters'] = hyperparameters + hp_map = {entry: json.loads(hyperparameters[entry]) for entry in hyperparameters} + framework_init_params['hyperparameters'] = hp_map init_params.update(framework_init_params) estimator = cls(sagemaker_session=sagemaker_session, **init_params) - estimator.latest_training_job = _TrainingJob(sagemaker_session=sagemaker_session, - training_job_name=init_params['base_job_name']) - estimator.latest_training_job.wait() estimator.uploaded_code = UploadedCode(estimator.source_dir, estimator.entry_point) return estimator diff --git a/src/sagemaker/mxnet/estimator.py b/src/sagemaker/mxnet/estimator.py index b41975345c..de3f81d98d 100644 --- a/src/sagemaker/mxnet/estimator.py +++ b/src/sagemaker/mxnet/estimator.py @@ -14,7 +14,6 @@ from sagemaker.estimator import Framework from sagemaker.fw_utils import create_image_uri, framework_name_from_image from sagemaker.mxnet.model import MXNetModel -from sagemaker.session import Session class MXNet(Framework): @@ -83,42 +82,12 @@ def create_model(self, model_server_workers=None): sagemaker_session=self.sagemaker_session) @classmethod - def attach(cls, training_job_name, sagemaker_session=None): - """Attach to an existing training job. - - Create an ``Estimator`` bound to an existing training job. After attaching, if - the training job is in a Complete status, it can be ``deploy``ed to create - a SageMaker ``Endpoint`` and return a ``Predictor``. - - If the training job is in progress, attach will block and display log messages - from the training job, until the training job completes. - - Args: - training_job_name (str): The name of the training job to attach to. - sagemaker_session (sagemaker.session.Session): Session object which manages interactions with - Amazon SageMaker APIs and any other AWS services needed. If not specified, the estimator creates one - using the default AWS configuration chain. - - Returns: - sagemaker.mxnet.estimator.MXNet: ``Estimator`` with the attached training job. - - Raises: - ValueError: If `training_job_name` is None or the image name does not match the framework. - """ - sagemaker_session = sagemaker_session or Session() - - if training_job_name is None: - raise ValueError("must specify training_job name") - - job_details = sagemaker_session.sagemaker_client.describe_training_job(TrainingJobName=training_job_name) - init_params, hp, image = cls._prepare_estimator_params_from_job_description(job_details) - - init_params.update({'hyperparameters': hp}) - + def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_session): framework, py_version = framework_name_from_image(image) init_params.update({'py_version': py_version}) + training_job_name = init_params['base_job_name'] if framework != cls.__framework_name__: raise ValueError("Training job: {} didn't use image for requested framework".format(training_job_name)) - return super(MXNet, cls).attach(training_job_name=None, sagemaker_session=sagemaker_session, **init_params) + return super(MXNet, cls)._from_training_job(init_params, hyperparameters, image, sagemaker_session) diff --git a/src/sagemaker/tensorflow/estimator.py b/src/sagemaker/tensorflow/estimator.py index 39f9601b3c..cd918d2717 100644 --- a/src/sagemaker/tensorflow/estimator.py +++ b/src/sagemaker/tensorflow/estimator.py @@ -20,7 +20,6 @@ import sagemaker.tensorflow from sagemaker.estimator import Framework from sagemaker.fw_utils import create_image_uri, framework_name_from_image -from sagemaker.session import Session from sagemaker.tensorflow.model import TensorFlowModel logging.basicConfig() @@ -166,48 +165,19 @@ def fit_super(): fit_super() @classmethod - def attach(cls, training_job_name, sagemaker_session=None): - """Attach to an existing training job. - - Create an ``Estimator`` bound to an existing training job. After attaching, if - the training job is in a Complete status, it can be ``deploy``ed to create - a SageMaker ``Endpoint`` and return a ``Predictor``. - - If the training job is in progress, attach will block and display log messages - from the training job, until the training job completes. - - Args: - training_job_name (str): The name of the training job to attach to. - sagemaker_session (sagemaker.session.Session): Session object which manages interactions with - Amazon SageMaker APIs and any other AWS services needed. If not specified, the estimator creates one - using the default AWS configuration chain. - - Returns: - sagemaker.tensorflow.estimator.TensorFlow: ``Estimator`` with the attached training job. - - Raises: - ValueError: If `training_job_name` is None or the image name does not match the framework. - """ - sagemaker_session = sagemaker_session or Session() - - if training_job_name is None: - raise ValueError("must specify training_job name") - - job_details = sagemaker_session.sagemaker_client.describe_training_job(TrainingJobName=training_job_name) - init_params, hp, image = cls._prepare_estimator_params_from_job_description(job_details) - - updated_params = cls._update_init_params(hp, ['checkpoint_path', 'training_steps', 'evaluation_steps']) + def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_session): + updated_params = cls._update_init_params(hyperparameters, + ['checkpoint_path', 'training_steps', 'evaluation_steps']) init_params.update(updated_params) - init_params.update({'hyperparameters': hp}) - framework, py_version = framework_name_from_image(image) init_params.update({'py_version': py_version}) + training_job_name = init_params['base_job_name'] if framework != cls.__framework_name__: raise ValueError("Training job: {} didn't use image for requested framework".format(training_job_name)) - return super(TensorFlow, cls).attach(training_job_name=None, sagemaker_session=sagemaker_session, **init_params) + return super(TensorFlow, cls)._from_training_job(init_params, hyperparameters, image, sagemaker_session) def train_image(self): """Return the Docker image to use for training. diff --git a/tests/integ/test_kmeans.py b/tests/integ/test_kmeans.py index 09780f69cd..6982a95d26 100644 --- a/tests/integ/test_kmeans.py +++ b/tests/integ/test_kmeans.py @@ -16,6 +16,7 @@ import boto3 import os +import time import sagemaker from sagemaker import KMeans, KMeansModel @@ -23,7 +24,10 @@ from tests.integ import DATA_DIR, REGION from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name +import pytest + +@pytest.mark.skip(reason="no way of currently testing this") def test_kmeans(): with timeout(minutes=15): @@ -60,3 +64,49 @@ def test_kmeans(): for record in result: assert record.label["closest_cluster"] is not None assert record.label["distance_to_cluster"] is not None + + +def test_async_kmeans(): + + training_job_name = "" + endpoint_name = name_from_base('kmeans') + + with timeout(minutes=15): + sagemaker_session = sagemaker.Session(boto_session=boto3.Session(region_name=REGION)) + data_path = os.path.join(DATA_DIR, 'one_p_mnist', 'mnist.pkl.gz') + pickle_args = {} if sys.version_info.major == 2 else {'encoding': 'latin1'} + + # Load the data into memory as numpy arrays + with gzip.open(data_path, 'rb') as f: + train_set, _, _ = pickle.load(f, **pickle_args) + + kmeans = KMeans(role='SageMakerRole', train_instance_count=1, + train_instance_type='ml.c4.xlarge', + k=10, sagemaker_session=sagemaker_session, base_job_name='test-kmeans') + + kmeans.init_method = 'random' + kmeans.max_iterators = 1 + kmeans.tol = 1 + kmeans.num_trials = 1 + kmeans.local_init_method = 'kmeans++' + kmeans.half_life_time_size = 1 + kmeans.epochs = 1 + kmeans.center_factor = 1 + + kmeans.fit(kmeans.record_set(train_set[0][:100]), wait=False) + training_job_name = kmeans.latest_training_job.name + + print("Detached from training job. Will re-attach in 20 seconds") + time.sleep(20) + print("attaching now...") + + with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=20): + estimator = KMeans.attach(training_job_name=training_job_name, sagemaker_session=sagemaker_session) + model = KMeansModel(estimator.model_data, role='SageMakerRole', sagemaker_session=sagemaker_session) + predictor = model.deploy(1, 'ml.c4.xlarge', endpoint_name=endpoint_name) + result = predictor.predict(train_set[0][:10]) + + assert len(result) == 10 + for record in result: + assert record.label["closest_cluster"] is not None + assert record.label["distance_to_cluster"] is not None diff --git a/tests/integ/test_linear_learner.py b/tests/integ/test_linear_learner.py index 31b9f506f3..a33840ff61 100644 --- a/tests/integ/test_linear_learner.py +++ b/tests/integ/test_linear_learner.py @@ -17,6 +17,8 @@ import pytest # noqa import boto3 import numpy as np +from datetime import time + import sagemaker from sagemaker.amazon.linear_learner import LinearLearner, LinearLearnerModel from sagemaker.utils import name_from_base @@ -84,3 +86,70 @@ def test_linear_learner(): for record in result: assert record.label["predicted_label"] is not None assert record.label["score"] is not None + +def test_async_linear_learner(sagemaker_session): + + training_job_name = "" + endpoint_name = 'test-linear-learner-async-{}'.format(int(time.time())) + + with timeout(minutes=15): + sagemaker_session = sagemaker.Session(boto_session=boto3.Session(region_name=REGION)) + data_path = os.path.join(DATA_DIR, 'one_p_mnist', 'mnist.pkl.gz') + pickle_args = {} if sys.version_info.major == 2 else {'encoding': 'latin1'} + + # Load the data into memory as numpy arrays + with gzip.open(data_path, 'rb') as f: + train_set, _, _ = pickle.load(f, **pickle_args) + + train_set[1][:100] = 1 + train_set[1][100:200] = 0 + train_set = train_set[0], train_set[1].astype(np.dtype('float32')) + + ll = LinearLearner('SageMakerRole', 1, 'ml.c4.2xlarge', base_job_name='test-linear-learner', + sagemaker_session=sagemaker_session) + ll.binary_classifier_model_selection_criteria = 'accuracy' + ll.target_reacall = 0.5 + ll.target_precision = 0.5 + ll.positive_example_weight_mult = 0.1 + ll.epochs = 1 + ll.predictor_type = 'binary_classifier' + ll.use_bias = True + ll.num_models = 1 + ll.num_calibration_samples = 1 + ll.init_method = 'uniform' + ll.init_scale = 0.5 + ll.init_sigma = 0.2 + ll.init_bias = 5 + ll.optimizer = 'adam' + ll.loss = 'logistic' + ll.wd = 0.5 + ll.l1 = 0.5 + ll.momentum = 0.5 + ll.learning_rate = 0.1 + ll.beta_1 = 0.1 + ll.beta_2 = 0.1 + ll.use_lr_scheduler = True + ll.lr_scheduler_step = 2 + ll.lr_scheduler_factor = 0.5 + ll.lr_scheduler_minimum_lr = 0.1 + ll.normalize_data = False + ll.normalize_label = False + ll.unbias_data = True + ll.unbias_label = False + ll.num_point_for_scala = 10000 + ll.fit(ll.record_set(train_set[0][:200], train_set[1][:200]), wait=False) + training_job_name = ll.latest_training_job.name + + print("Waiting to re-attach to the training job: %s" % training_job_name) + time.sleep(20) + + with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=20): + estimator = LinearLearner.attach(training_job_name=training_job_name, sagemaker_session=sagemaker_session) + model = LinearLearnerModel(estimator.model_data, role='SageMakerRole', sagemaker_session=sagemaker_session) + predictor = model.deploy(1, 'ml.c4.xlarge', endpoint_name=endpoint_name) + + result = predictor.predict(train_set[0][0:100]) + assert len(result) == 100 + for record in result: + assert record.label["predicted_label"] is not None + assert record.label["score"] is not None diff --git a/tests/integ/test_mxnet_train.py b/tests/integ/test_mxnet_train.py index 94feb6e9e1..8afc079bb4 100644 --- a/tests/integ/test_mxnet_train.py +++ b/tests/integ/test_mxnet_train.py @@ -58,6 +58,39 @@ def test_attach_deploy(mxnet_training_job, sagemaker_session): predictor.predict(data) +def test_async_fit(sagemaker_session): + + training_job_name = "" + endpoint_name = 'test-mxnet-attach-deploy-{}'.format(int(time.time())) + + with timeout(minutes=15): + script_path = os.path.join(DATA_DIR, 'mxnet_mnist', 'mnist.py') + data_path = os.path.join(DATA_DIR, 'mxnet_mnist') + + mx = MXNet(entry_point=script_path, role='SageMakerRole', + train_instance_count=1, train_instance_type='ml.c4.xlarge', + sagemaker_session=sagemaker_session) + + train_input = mx.sagemaker_session.upload_data(path=os.path.join(data_path, 'train'), + key_prefix='integ-test-data/mxnet_mnist/train') + test_input = mx.sagemaker_session.upload_data(path=os.path.join(data_path, 'test'), + key_prefix='integ-test-data/mxnet_mnist/test') + + mx.fit({'train': train_input, 'test': test_input}, wait=False) + training_job_name = mx.latest_training_job.name + + print("Waiting to re-attach to the training job: %s" % training_job_name) + time.sleep(20) + + with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=20): + print("Re-attaching now to: %s" % training_job_name) + estimator = MXNet.attach(training_job_name=training_job_name, sagemaker_session=sagemaker_session) + predictor = estimator.deploy(1, 'ml.m4.xlarge', endpoint_name=endpoint_name) + data = numpy.zeros(shape=(1, 1, 28, 28)) + predictor.predict(data) + + + def test_deploy_model(mxnet_training_job, sagemaker_session): endpoint_name = 'test-mxnet-deploy-model-{}'.format(int(time.time())) diff --git a/tests/integ/test_pca.py b/tests/integ/test_pca.py index adec22345e..bab2a1877a 100644 --- a/tests/integ/test_pca.py +++ b/tests/integ/test_pca.py @@ -14,8 +14,11 @@ import os import pickle import sys +import time + import pytest # noqa import boto3 + import sagemaker import sagemaker.amazon.pca from sagemaker.utils import name_from_base @@ -55,3 +58,46 @@ def test_pca(): assert len(result) == 5 for record in result: assert record.label["projection"] is not None + + +def test_async_pca(): + + training_job_name = "" + endpoint_name = name_from_base('kmeans') + + with timeout(minutes=15): + sagemaker_session = sagemaker.Session(boto_session=boto3.Session(region_name=REGION)) + data_path = os.path.join(DATA_DIR, 'one_p_mnist', 'mnist.pkl.gz') + pickle_args = {} if sys.version_info.major == 2 else {'encoding': 'latin1'} + + # Load the data into memory as numpy arrays + with gzip.open(data_path, 'rb') as f: + train_set, _, _ = pickle.load(f, **pickle_args) + + pca = sagemaker.amazon.pca.PCA(role='SageMakerRole', train_instance_count=1, + train_instance_type='ml.m4.xlarge', + num_components=48, sagemaker_session=sagemaker_session, base_job_name='test-pca') + + pca.algorithm_mode = 'randomized' + pca.subtract_mean = True + pca.extra_components = 5 + pca.fit(pca.record_set(train_set[0][:100]), wait=False) + training_job_name = pca.latest_training_job.name + + print("Detached from training job. Will re-attach in 20 seconds") + time.sleep(20) + + with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=20): + estimator = sagemaker.amazon.pca.PCA.attach(training_job_name=training_job_name, + sagemaker_session=sagemaker_session) + + model = sagemaker.amazon.pca.PCAModel(estimator.model_data, role='SageMakerRole', + sagemaker_session=sagemaker_session) + predictor = model.deploy(initial_instance_count=1, instance_type="ml.c4.xlarge", + endpoint_name=endpoint_name) + + result = predictor.predict(train_set[0][:5]) + + assert len(result) == 5 + for record in result: + assert record.label["projection"] is not None diff --git a/tests/integ/test_tf.py b/tests/integ/test_tf.py index bb602b83fe..6e79ddf024 100644 --- a/tests/integ/test_tf.py +++ b/tests/integ/test_tf.py @@ -13,6 +13,7 @@ import boto3 import os import pytest +import time from sagemaker import Session from sagemaker.tensorflow import TensorFlow @@ -27,6 +28,7 @@ def sagemaker_session(): return Session(boto_session=boto3.Session(region_name=REGION)) +@pytest.mark.skip(reason="Not Today") def test_tf(sagemaker_session): with timeout(minutes=15): script_path = os.path.join(DATA_DIR, 'iris', 'iris-dnn-classifier.py') @@ -52,6 +54,35 @@ def test_tf(sagemaker_session): print('predict result: {}'.format(result)) +def test_tf_async(sagemaker_session): + + training_job_name = "" + with timeout(minutes=15): + script_path = os.path.join(DATA_DIR, 'iris', 'iris-dnn-classifier.py') + + estimator = TensorFlow(entry_point=script_path, + role='SageMakerRole', + training_steps=1, + evaluation_steps=1, + hyperparameters={'input_tensor_name': 'inputs'}, + train_instance_count=1, + train_instance_type='ml.c4.xlarge', + sagemaker_session=sagemaker_session, + base_job_name='test-tf') + + inputs = estimator.sagemaker_session.upload_data(path=DATA_PATH, key_prefix='integ-test-data/tf_iris') + estimator.fit(inputs, wait=False) + training_job_name = estimator.latest_training_job.name + time.sleep(20) + + with timeout_and_delete_endpoint(estimator=estimator, minutes=20): + estimator = TensorFlow.attach(training_job_name=training_job_name, sagemaker_session=sagemaker_session) + json_predictor = estimator.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge') + + result = json_predictor.predict([6.4, 3.2, 4.5, 1.5]) + print('predict result: {}'.format(result)) + + def test_failed_tf_training(sagemaker_session): with timeout(minutes=15): script_path = os.path.join(DATA_DIR, 'iris', 'failure_script.py') diff --git a/tox.ini b/tox.ini index a42fe09296..09795ca02e 100644 --- a/tox.ini +++ b/tox.ini @@ -27,7 +27,11 @@ max-complexity = 10 [testenv] # TEAMCITY_VERSION environment variable exists during build on Teamcity. teamcity-messages uses it in order to enable # reporting to TeamCity. -passenv = TEAMCITY_VERSION +passenv = + TEAMCITY_VERSION + AWS_ACCESS_KEY_ID + AWS_SECRET_ACCESS_KEY + AWS_SESSION_TOKEN # {posargs} can be passed in by additional arguments specified when invoking tox. # Can be used to specify which tests to run, e.g.: tox -- -s commands = From ec53250eb02b8e43a4bb71d9ef61c3f26379f753 Mon Sep 17 00:00:00 2001 From: Ignacio Quintero Date: Thu, 25 Jan 2018 11:34:46 -0800 Subject: [PATCH 2/6] Fix flake8 and remove unused **kwargs --- src/sagemaker/estimator.py | 3 +-- tests/integ/test_linear_learner.py | 1 + tests/integ/test_mxnet_train.py | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/sagemaker/estimator.py b/src/sagemaker/estimator.py index f0e43ba147..28d8c39b64 100644 --- a/src/sagemaker/estimator.py +++ b/src/sagemaker/estimator.py @@ -153,13 +153,12 @@ def fit(self, inputs, wait=True, logs=True, job_name=None): if wait: self.latest_training_job.wait(logs=logs) - @classmethod def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_session): raise NotImplementedError() @classmethod - def attach(cls, training_job_name, sagemaker_session=None, **kwargs): + def attach(cls, training_job_name, sagemaker_session=None): """Attach to an existing training job. Create an Estimator bound to an existing training job. After attaching, if diff --git a/tests/integ/test_linear_learner.py b/tests/integ/test_linear_learner.py index a33840ff61..d8564cc699 100644 --- a/tests/integ/test_linear_learner.py +++ b/tests/integ/test_linear_learner.py @@ -87,6 +87,7 @@ def test_linear_learner(): assert record.label["predicted_label"] is not None assert record.label["score"] is not None + def test_async_linear_learner(sagemaker_session): training_job_name = "" diff --git a/tests/integ/test_mxnet_train.py b/tests/integ/test_mxnet_train.py index 8afc079bb4..458a7bb5b7 100644 --- a/tests/integ/test_mxnet_train.py +++ b/tests/integ/test_mxnet_train.py @@ -90,7 +90,6 @@ def test_async_fit(sagemaker_session): predictor.predict(data) - def test_deploy_model(mxnet_training_job, sagemaker_session): endpoint_name = 'test-mxnet-deploy-model-{}'.format(int(time.time())) From e595cc5e0772b74fa90d4c77118542b7ef6858d7 Mon Sep 17 00:00:00 2001 From: Ignacio Quintero Date: Thu, 25 Jan 2018 13:03:00 -0800 Subject: [PATCH 3/6] Add factorization machines async fit integ test Also fixed the timeouts for all the async fit integ tests. Previously we allowed 15 min timeout for training, and 20 min for hosting. With async fit the sections are split so we allow 5 min timeout for the intial fit call and setup. And then 35 min for the attach() + hosting calls. The total runtime is the same just split differently for async tests. --- tests/integ/test_factorization_machines.py | 43 ++++++++++++++++++++++ tests/integ/test_kmeans.py | 7 +--- tests/integ/test_linear_learner.py | 11 +++--- tests/integ/test_mxnet_train.py | 4 +- tests/integ/test_pca.py | 7 ++-- tests/integ/test_tf.py | 4 +- 6 files changed, 59 insertions(+), 17 deletions(-) diff --git a/tests/integ/test_factorization_machines.py b/tests/integ/test_factorization_machines.py index 76fbb93ac7..18a581bee5 100644 --- a/tests/integ/test_factorization_machines.py +++ b/tests/integ/test_factorization_machines.py @@ -13,6 +13,7 @@ import gzip import pickle import sys +import time import boto3 import os @@ -53,3 +54,45 @@ def test_factorization_machines(): assert len(result) == 10 for record in result: assert record.label["score"] is not None + + +def test_async_factorization_machines(): + + training_job_name = "" + endpoint_name = name_from_base('factorization_machines') + sagemaker_session = sagemaker.Session(boto_session=boto3.Session(region_name=REGION)) + + with timeout(minutes=5): + + data_path = os.path.join(DATA_DIR, 'one_p_mnist', 'mnist.pkl.gz') + pickle_args = {} if sys.version_info.major == 2 else {'encoding': 'latin1'} + + # Load the data into memory as numpy arrays + with gzip.open(data_path, 'rb') as f: + train_set, _, _ = pickle.load(f, **pickle_args) + + fm = FactorizationMachines(role='SageMakerRole', train_instance_count=1, + train_instance_type='ml.c4.xlarge', + num_factors=10, predictor_type='regressor', + epochs=2, clip_gradient=1e2, eps=0.001, rescale_grad=1.0 / 100, + sagemaker_session=sagemaker_session, base_job_name='test-fm') + + # training labels must be 'float32' + fm.fit(fm.record_set(train_set[0][:200], train_set[1][:200].astype('float32')), wait=False) + training_job_name = fm.latest_training_job.name + + print("Detached from training job. Will re-attach in 20 seconds") + time.sleep(20) + print("attaching now...") + + with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=35): + estimator = FactorizationMachines.attach(training_job_name=training_job_name, + sagemaker_session=sagemaker_session) + model = FactorizationMachinesModel(estimator.model_data, role='SageMakerRole', + sagemaker_session=sagemaker_session) + predictor = model.deploy(1, 'ml.c4.xlarge', endpoint_name=endpoint_name) + result = predictor.predict(train_set[0][:10]) + + assert len(result) == 10 + for record in result: + assert record.label["score"] is not None diff --git a/tests/integ/test_kmeans.py b/tests/integ/test_kmeans.py index 6982a95d26..bcaba3ce02 100644 --- a/tests/integ/test_kmeans.py +++ b/tests/integ/test_kmeans.py @@ -24,10 +24,7 @@ from tests.integ import DATA_DIR, REGION from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name -import pytest - -@pytest.mark.skip(reason="no way of currently testing this") def test_kmeans(): with timeout(minutes=15): @@ -71,7 +68,7 @@ def test_async_kmeans(): training_job_name = "" endpoint_name = name_from_base('kmeans') - with timeout(minutes=15): + with timeout(minutes=5): sagemaker_session = sagemaker.Session(boto_session=boto3.Session(region_name=REGION)) data_path = os.path.join(DATA_DIR, 'one_p_mnist', 'mnist.pkl.gz') pickle_args = {} if sys.version_info.major == 2 else {'encoding': 'latin1'} @@ -100,7 +97,7 @@ def test_async_kmeans(): time.sleep(20) print("attaching now...") - with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=20): + with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=35): estimator = KMeans.attach(training_job_name=training_job_name, sagemaker_session=sagemaker_session) model = KMeansModel(estimator.model_data, role='SageMakerRole', sagemaker_session=sagemaker_session) predictor = model.deploy(1, 'ml.c4.xlarge', endpoint_name=endpoint_name) diff --git a/tests/integ/test_linear_learner.py b/tests/integ/test_linear_learner.py index d8564cc699..6b287b3186 100644 --- a/tests/integ/test_linear_learner.py +++ b/tests/integ/test_linear_learner.py @@ -14,10 +14,10 @@ import os import pickle import sys +import time import pytest # noqa import boto3 import numpy as np -from datetime import time import sagemaker from sagemaker.amazon.linear_learner import LinearLearner, LinearLearnerModel @@ -88,13 +88,14 @@ def test_linear_learner(): assert record.label["score"] is not None -def test_async_linear_learner(sagemaker_session): +def test_async_linear_learner(): training_job_name = "" endpoint_name = 'test-linear-learner-async-{}'.format(int(time.time())) + sagemaker_session = sagemaker.Session(boto_session=boto3.Session(region_name=REGION)) + + with timeout(minutes=5): - with timeout(minutes=15): - sagemaker_session = sagemaker.Session(boto_session=boto3.Session(region_name=REGION)) data_path = os.path.join(DATA_DIR, 'one_p_mnist', 'mnist.pkl.gz') pickle_args = {} if sys.version_info.major == 2 else {'encoding': 'latin1'} @@ -144,7 +145,7 @@ def test_async_linear_learner(sagemaker_session): print("Waiting to re-attach to the training job: %s" % training_job_name) time.sleep(20) - with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=20): + with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=35): estimator = LinearLearner.attach(training_job_name=training_job_name, sagemaker_session=sagemaker_session) model = LinearLearnerModel(estimator.model_data, role='SageMakerRole', sagemaker_session=sagemaker_session) predictor = model.deploy(1, 'ml.c4.xlarge', endpoint_name=endpoint_name) diff --git a/tests/integ/test_mxnet_train.py b/tests/integ/test_mxnet_train.py index 458a7bb5b7..593f31e5d7 100644 --- a/tests/integ/test_mxnet_train.py +++ b/tests/integ/test_mxnet_train.py @@ -63,7 +63,7 @@ def test_async_fit(sagemaker_session): training_job_name = "" endpoint_name = 'test-mxnet-attach-deploy-{}'.format(int(time.time())) - with timeout(minutes=15): + with timeout(minutes=5): script_path = os.path.join(DATA_DIR, 'mxnet_mnist', 'mnist.py') data_path = os.path.join(DATA_DIR, 'mxnet_mnist') @@ -82,7 +82,7 @@ def test_async_fit(sagemaker_session): print("Waiting to re-attach to the training job: %s" % training_job_name) time.sleep(20) - with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=20): + with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=35): print("Re-attaching now to: %s" % training_job_name) estimator = MXNet.attach(training_job_name=training_job_name, sagemaker_session=sagemaker_session) predictor = estimator.deploy(1, 'ml.m4.xlarge', endpoint_name=endpoint_name) diff --git a/tests/integ/test_pca.py b/tests/integ/test_pca.py index bab2a1877a..7a00562bf4 100644 --- a/tests/integ/test_pca.py +++ b/tests/integ/test_pca.py @@ -63,10 +63,11 @@ def test_pca(): def test_async_pca(): training_job_name = "" - endpoint_name = name_from_base('kmeans') + endpoint_name = name_from_base('async_pca') + sagemaker_session = sagemaker.Session(boto_session=boto3.Session(region_name=REGION)) + + with timeout(minutes=20): - with timeout(minutes=15): - sagemaker_session = sagemaker.Session(boto_session=boto3.Session(region_name=REGION)) data_path = os.path.join(DATA_DIR, 'one_p_mnist', 'mnist.pkl.gz') pickle_args = {} if sys.version_info.major == 2 else {'encoding': 'latin1'} diff --git a/tests/integ/test_tf.py b/tests/integ/test_tf.py index 6e79ddf024..9f570856e4 100644 --- a/tests/integ/test_tf.py +++ b/tests/integ/test_tf.py @@ -57,7 +57,7 @@ def test_tf(sagemaker_session): def test_tf_async(sagemaker_session): training_job_name = "" - with timeout(minutes=15): + with timeout(minutes=5): script_path = os.path.join(DATA_DIR, 'iris', 'iris-dnn-classifier.py') estimator = TensorFlow(entry_point=script_path, @@ -75,7 +75,7 @@ def test_tf_async(sagemaker_session): training_job_name = estimator.latest_training_job.name time.sleep(20) - with timeout_and_delete_endpoint(estimator=estimator, minutes=20): + with timeout_and_delete_endpoint(estimator=estimator, minutes=35): estimator = TensorFlow.attach(training_job_name=training_job_name, sagemaker_session=sagemaker_session) json_predictor = estimator.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge') From 0e507900b6b7a71e5584e5c3286cbba440fcdbda Mon Sep 17 00:00:00 2001 From: Ignacio Quintero Date: Fri, 26 Jan 2018 10:53:39 -0800 Subject: [PATCH 4/6] Add exception for async tensorboard and fix tests. Fix the PCA and factorization machines async fit integration tests and add an exception when running Tensorboard with async fit. --- src/sagemaker/tensorflow/estimator.py | 3 +++ tests/integ/test_factorization_machines.py | 2 +- tests/integ/test_pca.py | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/sagemaker/tensorflow/estimator.py b/src/sagemaker/tensorflow/estimator.py index cd918d2717..ddc71ff846 100644 --- a/src/sagemaker/tensorflow/estimator.py +++ b/src/sagemaker/tensorflow/estimator.py @@ -152,6 +152,9 @@ def fit(self, inputs, wait=True, logs=True, job_name=None, run_tensorboard_local def fit_super(): super(TensorFlow, self).fit(inputs, wait, logs, job_name) + if run_tensorboard_locally and wait is False: + raise ValueError("Tensorboard is not supported with async fit") + if run_tensorboard_locally: tensorboard = Tensorboard(self) tensorboard.validate_requirements() diff --git a/tests/integ/test_factorization_machines.py b/tests/integ/test_factorization_machines.py index 18a581bee5..cc04ed8d6a 100644 --- a/tests/integ/test_factorization_machines.py +++ b/tests/integ/test_factorization_machines.py @@ -59,7 +59,7 @@ def test_factorization_machines(): def test_async_factorization_machines(): training_job_name = "" - endpoint_name = name_from_base('factorization_machines') + endpoint_name = name_from_base('factorizationMachines') sagemaker_session = sagemaker.Session(boto_session=boto3.Session(region_name=REGION)) with timeout(minutes=5): diff --git a/tests/integ/test_pca.py b/tests/integ/test_pca.py index 7a00562bf4..b13994f35a 100644 --- a/tests/integ/test_pca.py +++ b/tests/integ/test_pca.py @@ -63,7 +63,7 @@ def test_pca(): def test_async_pca(): training_job_name = "" - endpoint_name = name_from_base('async_pca') + endpoint_name = name_from_base('pca') sagemaker_session = sagemaker.Session(boto_session=boto3.Session(region_name=REGION)) with timeout(minutes=20): From 24ae51eb38fb8c8cdaafcdbcef9a7b3eb90c4eea Mon Sep 17 00:00:00 2001 From: Ignacio Quintero Date: Mon, 29 Jan 2018 17:11:13 -0800 Subject: [PATCH 5/6] Add BYOA implementation and missing docs. BYO was missing an implementation of _from_training_job(). This adds that as well as an integration test to verify that. Also addressed the PR comments and added information to the README. --- README.rst | 23 +++++++- src/sagemaker/amazon/amazon_estimator.py | 12 ++++- src/sagemaker/estimator.py | 63 +++++++++++++++++++--- src/sagemaker/mxnet/estimator.py | 12 +++++ src/sagemaker/tensorflow/estimator.py | 12 +++++ tests/integ/test_byo_estimator.py | 69 +++++++++++++++++++++--- 6 files changed, 174 insertions(+), 17 deletions(-) diff --git a/README.rst b/README.rst index e06dea8b3c..c80ef02d4b 100644 --- a/README.rst +++ b/README.rst @@ -97,7 +97,7 @@ SageMaker Python SDK provides several high-level abstractions for working with A - **Estimators**: Encapsulate training on SageMaker. Can be ``fit()`` to run training, then the resulting model ``deploy()`` ed to a SageMaker Endpoint. - **Models**: Encapsulate built ML models. Can be ``deploy()`` ed to a SageMaker Endpoint. - **Predictors**: Provide real-time inference and transformation using Python data-types against a SageMaker Endpoint. -- **Session**: Provides a collection of convience methods for working with SageMaker resources. +- **Session**: Provides a collection of convenience methods for working with SageMaker resources. Estimator and Model implementations for MXNet, TensorFlow, and Amazon ML algorithms are included. There's also an Estimator that runs SageMaker compatible custom Docker containers, allowing you to run your own ML algorithms via SageMaker Python SDK. @@ -1150,6 +1150,7 @@ Optional arguments - ``wait (bool)``: Defaults to True, whether to block and wait for the training script to complete before returning. + If set to False, it will return immediately, and can later be attached to. - ``logs (bool)``: Defaults to True, whether to show logs produced by training job in the Python session. Only meaningful when wait is True. - ``run_tensorboard_locally (bool)``: Defaults to False. Executes TensorBoard in a different @@ -1178,9 +1179,25 @@ the ``TensorFlow`` estimator parameter ``training_steps`` is finished or when th job execution time reaches the ``TensorFlow`` estimator parameter ``train_max_run``. When the training job finishes, a `TensorFlow serving `_ -with the result of the training is generated and saved to the S3 location define by +with the result of the training is generated and saved to the S3 location defined by the ``TensorFlow`` estimator parameter ``output_path``. + +If the ``wait=False`` flag is passed to ``fit``, then it will return immediately. The training job will continue running +asynchronously. At a later time, a Tensorflow Estimator can be obtained by attaching to the existing training job. If +the training job is not finished it will start showing the standard output of training and wait until it completes. +After attaching, the estimator can be deployed as usual. + +.. code:: python + + tf_estimator.fit(your_input_data, wait=False) + training_job_name = tf_estimator.latest_training_job.name + + # after some time, or in a separate python notebook, we can attach to it again. + + tf_estimator = TensorFlow.attach(training_job_name=training_job_name) + + The evaluation process """""""""""""""""""""" @@ -1244,6 +1261,8 @@ You can access TensorBoard locally at http://localhost:6006 or using your SakeMa `https*workspace_base_url*proxy/6006/ `_ (TensorBoard will not work if you forget to put the slash, '/', in end of the url). If TensorBoard started on a different port, adjust these URLs to match. +Note that TensorBoard is not supported when passing wait=False to ``fit``. + Deploying TensorFlow Serving models ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/src/sagemaker/amazon/amazon_estimator.py b/src/sagemaker/amazon/amazon_estimator.py index 4aca72c06e..910913ecf7 100644 --- a/src/sagemaker/amazon/amazon_estimator.py +++ b/src/sagemaker/amazon/amazon_estimator.py @@ -66,11 +66,21 @@ def data_location(self, data_location): @classmethod def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_session): + """Create an Estimator from existing training job data. + + Args: + init_params (dict): The init_params the training job was created with. + hyperparameters (dict): The hyperparameters the training job was created with. + image (str): Container image (if any) the training job was created with + sagemaker_session (sagemaker.session.Session): A sagemaker Session to pass to the estimator. + + Returns: An instance of the calling Estimator Class. + + """ # The hyperparam names may not be the same as the class attribute that holds them, # for instance: local_lloyd_init_method is called local_init_method. We need to map these # and pass the correct name to the constructor. - for attribute, value in cls.__dict__.items(): if isinstance(value, hp): if value.name in hyperparameters: diff --git a/src/sagemaker/estimator.py b/src/sagemaker/estimator.py index 28d8c39b64..ce9b59b7d7 100644 --- a/src/sagemaker/estimator.py +++ b/src/sagemaker/estimator.py @@ -155,14 +155,26 @@ def fit(self, inputs, wait=True, logs=True, job_name=None): @classmethod def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_session): + """Create an Estimator from existing training job data. + + Args: + init_params (dict): The init_params the training job was created with. + hyperparameters (dict): The hyperparameters the training job was created with. + image (str): Container image (if any) the training job was created with + sagemaker_session (sagemaker.session.Session): A sagemaker Session to pass to the estimator. + + Returns: An instance of the calling Estimator Class. + + """ raise NotImplementedError() @classmethod def attach(cls, training_job_name, sagemaker_session=None): """Attach to an existing training job. - Create an Estimator bound to an existing training job. After attaching, if - the training job has a Complete status, it can be ``deploy()`` ed to create + Create an Estimator bound to an existing training job, each subclass is responsible to implement + ``from_training_job()`` as this method delegates the actual Estimator creation to it. After + attaching, if the training job has a Complete status, it can be ``deploy()`` ed to create a SageMaker Endpoint and return a ``Predictor``. If the training job is in progress, attach will block and display log messages @@ -173,17 +185,22 @@ def attach(cls, training_job_name, sagemaker_session=None): sagemaker_session (sagemaker.session.Session): Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, the estimator creates one using the default AWS configuration chain. - **kwargs: Additional kwargs passed to the :class:`~sagemaker.estimator.Estimator` constructor. + + Examples: + >>> my_estimator.fit(wait=False) + >>> training_job_name = my_estimator.latest_training_job.name + Later on: + >>> attached_estimator = Estimator.attach(training_job_name) + >>> attached_estimator.deploy() Returns: - sagemaker.estimator.Framework: ``Estimator`` with the attached training job. + Instance of the calling ``Estimator`` Class with the attached training job. """ sagemaker_session = sagemaker_session or Session() - if training_job_name is not None: + if training_job_name: job_details = sagemaker_session.sagemaker_client.describe_training_job(TrainingJobName=training_job_name) init_params, hp, image = cls._prepare_estimator_params_from_job_description(job_details) - else: raise ValueError('must specify training_job name') @@ -460,6 +477,25 @@ def predict_wrapper(endpoint, session): return Model(self.model_data, image or self.train_image(), self.role, sagemaker_session=self.sagemaker_session, predictor_cls=predictor_cls, **kwargs) + @classmethod + def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_session): + """Create an Estimator from existing training job data. + + Args: + init_params (dict): The init_params the training job was created with. + hyperparameters (dict): The hyperparameters the training job was created with. + image (str): Container image (if any) the training job was created with + sagemaker_session (sagemaker.session.Session): A sagemaker Session to pass to the estimator. + + Returns: An instance of the calling Estimator Class. + + """ + + estimator = cls(sagemaker_session=sagemaker_session, **init_params) + cls.set_hyperparameters(**hyperparameters) + + return estimator + class Framework(EstimatorBase): """Base class that cannot be instantiated directly. @@ -567,6 +603,17 @@ def hyperparameters(self): @classmethod def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_session): + """Create an Estimator from existing training job data. + + Args: + init_params (dict): The init_params the training job was created with. + hyperparameters (dict): The hyperparameters the training job was created with. + image (str): Container image (if any) the training job was created with + sagemaker_session (sagemaker.session.Session): A sagemaker Session to pass to the estimator. + + Returns: An instance of the calling Estimator Class. + + """ # parameters for framework classes framework_init_params = dict() @@ -578,8 +625,8 @@ def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_sessi hyperparameters.get(CONTAINER_LOG_LEVEL_PARAM_NAME)) # drop json and remove other SageMaker specific additions - hp_map = {entry: json.loads(hyperparameters[entry]) for entry in hyperparameters} - framework_init_params['hyperparameters'] = hp_map + deserialized_hps = {entry: json.loads(hyperparameters[entry]) for entry in hyperparameters} + framework_init_params['hyperparameters'] = deserialized_hps init_params.update(framework_init_params) diff --git a/src/sagemaker/mxnet/estimator.py b/src/sagemaker/mxnet/estimator.py index de3f81d98d..422bca4ad9 100644 --- a/src/sagemaker/mxnet/estimator.py +++ b/src/sagemaker/mxnet/estimator.py @@ -83,6 +83,18 @@ def create_model(self, model_server_workers=None): @classmethod def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_session): + """Create an Estimator from existing training job data. + + Args: + init_params (dict): The init_params the training job was created with. + hyperparameters (dict): The hyperparameters the training job was created with. + image (str): Container image (if any) the training job was created with + sagemaker_session (sagemaker.session.Session): A sagemaker Session to pass to the estimator. + + Returns: An instance of the calling Estimator Class. + + """ + framework, py_version = framework_name_from_image(image) init_params.update({'py_version': py_version}) diff --git a/src/sagemaker/tensorflow/estimator.py b/src/sagemaker/tensorflow/estimator.py index ddc71ff846..45d8ebcac4 100644 --- a/src/sagemaker/tensorflow/estimator.py +++ b/src/sagemaker/tensorflow/estimator.py @@ -169,6 +169,18 @@ def fit_super(): @classmethod def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_session): + """Create an Estimator from existing training job data. + + Args: + init_params (dict): The init_params the training job was created with. + hyperparameters (dict): The hyperparameters the training job was created with. + image (str): Container image (if any) the training job was created with + sagemaker_session (sagemaker.session.Session): A sagemaker Session to pass to the estimator. + + Returns: An instance of the calling Estimator Class. + + """ + updated_params = cls._update_init_params(hyperparameters, ['checkpoint_path', 'training_steps', 'evaluation_steps']) init_params.update(updated_params) diff --git a/tests/integ/test_byo_estimator.py b/tests/integ/test_byo_estimator.py index d0c1a18e07..eb6840705d 100644 --- a/tests/integ/test_byo_estimator.py +++ b/tests/integ/test_byo_estimator.py @@ -29,6 +29,13 @@ from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name +def fm_serializer(data): + js = {'instances': []} + for row in data: + js['instances'].append({'features': row.tolist()}) + return json.dumps(js) + + def test_byo_estimator(): """Use Factorization Machines algorithm as an example here. @@ -79,12 +86,6 @@ def test_byo_estimator(): endpoint_name = name_from_base('byo') - def fm_serializer(data): - js = {'instances': []} - for row in data: - js['instances'].append({'features': row.tolist()}) - return json.dumps(js) - with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=20): model = estimator.create_model() predictor = model.deploy(1, 'ml.m4.xlarge', endpoint_name=endpoint_name) @@ -97,3 +98,59 @@ def fm_serializer(data): assert len(result['predictions']) == 10 for prediction in result['predictions']: assert prediction['score'] is not None + + +def test_async_byo_estimator(): + image_name = registry(REGION) + "/factorization-machines:1" + endpoint_name = name_from_base('byo') + training_job_name = "" + + with timeout(minutes=5): + sagemaker_session = sagemaker.Session(boto_session=boto3.Session(region_name=REGION)) + data_path = os.path.join(DATA_DIR, 'one_p_mnist', 'mnist.pkl.gz') + pickle_args = {} if sys.version_info.major == 2 else {'encoding': 'latin1'} + + with gzip.open(data_path, 'rb') as f: + train_set, _, _ = pickle.load(f, **pickle_args) + + # take 100 examples for faster execution + vectors = np.array([t.tolist() for t in train_set[0][:100]]).astype('float32') + labels = np.where(np.array([t.tolist() for t in train_set[1][:100]]) == 0, 1.0, 0.0).astype('float32') + + buf = io.BytesIO() + write_numpy_to_dense_tensor(buf, vectors, labels) + buf.seek(0) + + bucket = sagemaker_session.default_bucket() + prefix = 'test_byo_estimator' + key = 'recordio-pb-data' + boto3.resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'train', key)).upload_fileobj(buf) + s3_train_data = 's3://{}/{}/train/{}'.format(bucket, prefix, key) + + estimator = Estimator(image_name=image_name, + role='SageMakerRole', train_instance_count=1, + train_instance_type='ml.c4.xlarge', + sagemaker_session=sagemaker_session, base_job_name='test-byo') + + estimator.set_hyperparameters(num_factors=10, + feature_dim=784, + mini_batch_size=100, + predictor_type='binary_classifier') + + # training labels must be 'float32' + estimator.fit({'train': s3_train_data}, wait=False) + training_job_name = estimator.latest_training_job.name + + with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, minutes=30): + estimator = Estimator.attach(training_job_name=training_job_name, sagemaker_session=sagemaker_session) + model = estimator.create_model() + predictor = model.deploy(1, 'ml.m4.xlarge', endpoint_name=endpoint_name) + predictor.serializer = fm_serializer + predictor.content_type = 'application/json' + predictor.deserializer = sagemaker.predictor.json_deserializer + + result = predictor.predict(train_set[0][:10]) + + assert len(result['predictions']) == 10 + for prediction in result['predictions']: + assert prediction['score'] is not None From 7918d8bbada1aefcff53ea7f7e1bc3d5e1c983e7 Mon Sep 17 00:00:00 2001 From: Ignacio Quintero Date: Wed, 31 Jan 2018 13:37:44 -0800 Subject: [PATCH 6/6] Refactor attach to remove _from_training_job() _prepare_init_params_from_job_description() is now a classmethod instead of being a static method. Each class is responsible to implement their specific logic to convert a training job description into arguments that can be passed to its own __init__() --- src/sagemaker/amazon/amazon_estimator.py | 21 ++-- src/sagemaker/estimator.py | 143 ++++++++++++++--------- src/sagemaker/mxnet/estimator.py | 21 ++-- src/sagemaker/tensorflow/estimator.py | 32 ++--- tests/integ/test_byo_estimator.py | 2 + tests/integ/test_tf.py | 1 - tests/unit/test_estimator.py | 12 +- tests/unit/test_mxnet.py | 6 - tests/unit/test_tf_estimator.py | 6 - 9 files changed, 132 insertions(+), 112 deletions(-) diff --git a/src/sagemaker/amazon/amazon_estimator.py b/src/sagemaker/amazon/amazon_estimator.py index 910913ecf7..5475135bf4 100644 --- a/src/sagemaker/amazon/amazon_estimator.py +++ b/src/sagemaker/amazon/amazon_estimator.py @@ -65,28 +65,29 @@ def data_location(self, data_location): self._data_location = data_location @classmethod - def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_session): - """Create an Estimator from existing training job data. + def _prepare_init_params_from_job_description(cls, job_details): + """Convert the job description to init params that can be handled by the class constructor Args: - init_params (dict): The init_params the training job was created with. - hyperparameters (dict): The hyperparameters the training job was created with. - image (str): Container image (if any) the training job was created with - sagemaker_session (sagemaker.session.Session): A sagemaker Session to pass to the estimator. + job_details: the returned job details from a describe_training_job API call. - Returns: An instance of the calling Estimator Class. + Returns: + dictionary: The transformed init_params """ + init_params = super(AmazonAlgorithmEstimatorBase, cls)._prepare_init_params_from_job_description(job_details) # The hyperparam names may not be the same as the class attribute that holds them, # for instance: local_lloyd_init_method is called local_init_method. We need to map these # and pass the correct name to the constructor. for attribute, value in cls.__dict__.items(): if isinstance(value, hp): - if value.name in hyperparameters: - init_params[attribute] = hyperparameters[value.name] + if value.name in init_params['hyperparameters']: + init_params[attribute] = init_params['hyperparameters'][value.name] - return cls(sagemaker_session=sagemaker_session, **init_params) + del init_params['hyperparameters'] + del init_params['image'] + return init_params def fit(self, records, mini_batch_size=None, **kwargs): """Fit this Estimator on serialized Record objects, stored in S3. diff --git a/src/sagemaker/estimator.py b/src/sagemaker/estimator.py index ce9b59b7d7..c672703315 100644 --- a/src/sagemaker/estimator.py +++ b/src/sagemaker/estimator.py @@ -169,13 +169,13 @@ def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_sessi raise NotImplementedError() @classmethod - def attach(cls, training_job_name, sagemaker_session=None): + def attach(cls, training_job_name, sagemaker_session=None, job_details=None): """Attach to an existing training job. Create an Estimator bound to an existing training job, each subclass is responsible to implement - ``from_training_job()`` as this method delegates the actual Estimator creation to it. After - attaching, if the training job has a Complete status, it can be ``deploy()`` ed to create - a SageMaker Endpoint and return a ``Predictor``. + ``_prepare_init_params_from_job_description()`` as this method delegates the actual conversion of a training + job description to the arguments that the class constructor expects. After attaching, if the training job has a + Complete status, it can be ``deploy()`` ed to create a SageMaker Endpoint and return a ``Predictor``. If the training job is in progress, attach will block and display log messages from the training job, until the training job completes. @@ -198,13 +198,10 @@ def attach(cls, training_job_name, sagemaker_session=None): """ sagemaker_session = sagemaker_session or Session() - if training_job_name: - job_details = sagemaker_session.sagemaker_client.describe_training_job(TrainingJobName=training_job_name) - init_params, hp, image = cls._prepare_estimator_params_from_job_description(job_details) - else: - raise ValueError('must specify training_job name') + job_details = sagemaker_session.sagemaker_client.describe_training_job(TrainingJobName=training_job_name) + init_params = cls._prepare_init_params_from_job_description(job_details) - estimator = cls._from_training_job(init_params, hp, image, sagemaker_session) + estimator = cls(sagemaker_session=sagemaker_session, **init_params) estimator.latest_training_job = _TrainingJob(sagemaker_session=sagemaker_session, training_job_name=init_params['base_job_name']) estimator.latest_training_job.wait() @@ -257,21 +254,33 @@ def create_model(self, **kwargs): """ pass - @staticmethod - def _prepare_estimator_params_from_job_description(job_details): - estimator_params = dict() + @classmethod + def _prepare_init_params_from_job_description(cls, job_details): + """Convert the job description to init params that can be handled by the class constructor - estimator_params['role'] = job_details['RoleArn'] - estimator_params['train_instance_count'] = job_details['ResourceConfig']['InstanceCount'] - estimator_params['train_instance_type'] = job_details['ResourceConfig']['InstanceType'] - estimator_params['train_volume_size'] = job_details['ResourceConfig']['VolumeSizeInGB'] - estimator_params['train_max_run'] = job_details['StoppingCondition']['MaxRuntimeInSeconds'] - estimator_params['input_mode'] = job_details['AlgorithmSpecification']['TrainingInputMode'] - estimator_params['base_job_name'] = job_details['TrainingJobName'] - estimator_params['output_path'] = job_details['OutputDataConfig']['S3OutputPath'] - estimator_params['output_kms_key'] = job_details['OutputDataConfig']['KmsKeyId'] + Args: + job_details: the returned job details from a describe_training_job API call. - return estimator_params, job_details['HyperParameters'], job_details['AlgorithmSpecification']['TrainingImage'] + Returns: + dictionary: The transformed init_params + + """ + init_params = dict() + + init_params['role'] = job_details['RoleArn'] + init_params['train_instance_count'] = job_details['ResourceConfig']['InstanceCount'] + init_params['train_instance_type'] = job_details['ResourceConfig']['InstanceType'] + init_params['train_volume_size'] = job_details['ResourceConfig']['VolumeSizeInGB'] + init_params['train_max_run'] = job_details['StoppingCondition']['MaxRuntimeInSeconds'] + init_params['input_mode'] = job_details['AlgorithmSpecification']['TrainingInputMode'] + init_params['base_job_name'] = job_details['TrainingJobName'] + init_params['output_path'] = job_details['OutputDataConfig']['S3OutputPath'] + init_params['output_kms_key'] = job_details['OutputDataConfig']['KmsKeyId'] + + init_params['hyperparameters'] = job_details['HyperParameters'] + init_params['image'] = job_details['AlgorithmSpecification']['TrainingImage'] + + return init_params def delete_endpoint(self): """Delete an Amazon SageMaker ``Endpoint``. @@ -388,7 +397,8 @@ class Estimator(EstimatorBase): def __init__(self, image_name, role, train_instance_count, train_instance_type, train_volume_size=30, train_max_run=24 * 60 * 60, input_mode='File', - output_path=None, output_kms_key=None, base_job_name=None, sagemaker_session=None): + output_path=None, output_kms_key=None, base_job_name=None, sagemaker_session=None, + hyperparameters=None): """Initialize an ``Estimator`` instance. Args: @@ -420,9 +430,10 @@ def __init__(self, image_name, role, train_instance_count, train_instance_type, sagemaker_session (sagemaker.session.Session): Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed. If not specified, the estimator creates one using the default AWS configuration chain. + hyperparameters (dict): Dictionary containing the hyperparameters to initialize this estimator with. """ self.image_name = image_name - self.hyperparam_dict = {} + self.hyperparam_dict = hyperparameters.copy() if hyperparameters else {} super(Estimator, self).__init__(role, train_instance_count, train_instance_type, train_volume_size, train_max_run, input_mode, output_path, output_kms_key, base_job_name, sagemaker_session) @@ -478,23 +489,20 @@ def predict_wrapper(endpoint, session): predictor_cls=predictor_cls, **kwargs) @classmethod - def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_session): - """Create an Estimator from existing training job data. + def _prepare_init_params_from_job_description(cls, job_details): + """Convert the job description to init params that can be handled by the class constructor Args: - init_params (dict): The init_params the training job was created with. - hyperparameters (dict): The hyperparameters the training job was created with. - image (str): Container image (if any) the training job was created with - sagemaker_session (sagemaker.session.Session): A sagemaker Session to pass to the estimator. + job_details: the returned job details from a describe_training_job API call. - Returns: An instance of the calling Estimator Class. + Returns: + dictionary: The transformed init_params """ + init_params = super(Estimator, cls)._prepare_init_params_from_job_description(job_details) - estimator = cls(sagemaker_session=sagemaker_session, **init_params) - cls.set_hyperparameters(**hyperparameters) - - return estimator + init_params['image_name'] = init_params.pop('image') + return init_params class Framework(EstimatorBase): @@ -602,35 +610,58 @@ def hyperparameters(self): return self._json_encode_hyperparameters(self._hyperparameters) @classmethod - def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_session): - """Create an Estimator from existing training job data. + def _prepare_init_params_from_job_description(cls, job_details): + """Convert the job description to init params that can be handled by the class constructor Args: - init_params (dict): The init_params the training job was created with. - hyperparameters (dict): The hyperparameters the training job was created with. - image (str): Container image (if any) the training job was created with - sagemaker_session (sagemaker.session.Session): A sagemaker Session to pass to the estimator. + job_details: the returned job details from a describe_training_job API call. - Returns: An instance of the calling Estimator Class. + Returns: + dictionary: The transformed init_params """ + init_params = super(Framework, cls)._prepare_init_params_from_job_description(job_details) - # parameters for framework classes - framework_init_params = dict() - framework_init_params['entry_point'] = json.loads(hyperparameters.get(SCRIPT_PARAM_NAME)) - framework_init_params['source_dir'] = json.loads(hyperparameters.get(DIR_PARAM_NAME)) - framework_init_params['enable_cloudwatch_metrics'] = json.loads( - hyperparameters.get(CLOUDWATCH_METRICS_PARAM_NAME)) - framework_init_params['container_log_level'] = json.loads( - hyperparameters.get(CONTAINER_LOG_LEVEL_PARAM_NAME)) + init_params['entry_point'] = json.loads(init_params['hyperparameters'].get(SCRIPT_PARAM_NAME)) + init_params['source_dir'] = json.loads(init_params['hyperparameters'].get(DIR_PARAM_NAME)) + init_params['enable_cloudwatch_metrics'] = json.loads( + init_params['hyperparameters'].get(CLOUDWATCH_METRICS_PARAM_NAME)) + init_params['container_log_level'] = json.loads( + init_params['hyperparameters'].get(CONTAINER_LOG_LEVEL_PARAM_NAME)) - # drop json and remove other SageMaker specific additions - deserialized_hps = {entry: json.loads(hyperparameters[entry]) for entry in hyperparameters} - framework_init_params['hyperparameters'] = deserialized_hps + init_params['hyperparameters'] = {k: json.loads(v) for k, v in init_params['hyperparameters'].items()} - init_params.update(framework_init_params) + return init_params - estimator = cls(sagemaker_session=sagemaker_session, **init_params) + @classmethod + def attach(cls, training_job_name, sagemaker_session=None): + """Attach to an existing training job. + + Create an Estimator bound to an existing training job, each subclass is responsible to implement + ``_prepare_init_params_from_job_description()`` as this method delegates the actual conversion of a training + job description to the arguments that the class constructor expects. After attaching, if the training job has a + Complete status, it can be ``deploy()`` ed to create a SageMaker Endpoint and return a ``Predictor``. + + If the training job is in progress, attach will block and display log messages + from the training job, until the training job completes. + + Args: + training_job_name (str): The name of the training job to attach to. + sagemaker_session (sagemaker.session.Session): Session object which manages interactions with + Amazon SageMaker APIs and any other AWS services needed. If not specified, the estimator creates one + using the default AWS configuration chain. + + Examples: + >>> my_estimator.fit(wait=False) + >>> training_job_name = my_estimator.latest_training_job.name + Later on: + >>> attached_estimator = Estimator.attach(training_job_name) + >>> attached_estimator.deploy() + + Returns: + Instance of the calling ``Estimator`` Class with the attached training job. + """ + estimator = super(Framework, cls).attach(training_job_name, sagemaker_session) estimator.uploaded_code = UploadedCode(estimator.source_dir, estimator.entry_point) return estimator diff --git a/src/sagemaker/mxnet/estimator.py b/src/sagemaker/mxnet/estimator.py index 422bca4ad9..2793089c0f 100644 --- a/src/sagemaker/mxnet/estimator.py +++ b/src/sagemaker/mxnet/estimator.py @@ -82,24 +82,23 @@ def create_model(self, model_server_workers=None): sagemaker_session=self.sagemaker_session) @classmethod - def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_session): - """Create an Estimator from existing training job data. + def _prepare_init_params_from_job_description(cls, job_details): + """Convert the job description to init params that can be handled by the class constructor Args: - init_params (dict): The init_params the training job was created with. - hyperparameters (dict): The hyperparameters the training job was created with. - image (str): Container image (if any) the training job was created with - sagemaker_session (sagemaker.session.Session): A sagemaker Session to pass to the estimator. + job_details: the returned job details from a describe_training_job API call. - Returns: An instance of the calling Estimator Class. + Returns: + dictionary: The transformed init_params """ + init_params = super(MXNet, cls)._prepare_init_params_from_job_description(job_details) + framework, py_version = framework_name_from_image(init_params.pop('image')) - framework, py_version = framework_name_from_image(image) - init_params.update({'py_version': py_version}) - + init_params['py_version'] = py_version training_job_name = init_params['base_job_name'] + if framework != cls.__framework_name__: raise ValueError("Training job: {} didn't use image for requested framework".format(training_job_name)) - return super(MXNet, cls)._from_training_job(init_params, hyperparameters, image, sagemaker_session) + return init_params diff --git a/src/sagemaker/tensorflow/estimator.py b/src/sagemaker/tensorflow/estimator.py index 45d8ebcac4..463a57d9df 100644 --- a/src/sagemaker/tensorflow/estimator.py +++ b/src/sagemaker/tensorflow/estimator.py @@ -11,12 +11,11 @@ # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. import logging +import os import subprocess import tempfile import threading -import os - import sagemaker.tensorflow from sagemaker.estimator import Framework from sagemaker.fw_utils import create_image_uri, framework_name_from_image @@ -168,31 +167,32 @@ def fit_super(): fit_super() @classmethod - def _from_training_job(cls, init_params, hyperparameters, image, sagemaker_session): - """Create an Estimator from existing training job data. + def _prepare_init_params_from_job_description(cls, job_details): + """Convert the job description to init params that can be handled by the class constructor Args: - init_params (dict): The init_params the training job was created with. - hyperparameters (dict): The hyperparameters the training job was created with. - image (str): Container image (if any) the training job was created with - sagemaker_session (sagemaker.session.Session): A sagemaker Session to pass to the estimator. + job_details: the returned job details from a describe_training_job API call. - Returns: An instance of the calling Estimator Class. + Returns: + dictionary: The transformed init_params """ + init_params = super(TensorFlow, cls)._prepare_init_params_from_job_description(job_details) - updated_params = cls._update_init_params(hyperparameters, - ['checkpoint_path', 'training_steps', 'evaluation_steps']) - init_params.update(updated_params) + # Move some of the tensorflow specific init params from hyperparameters into the main init params. + for argument in ['checkpoint_path', 'training_steps', 'evaluation_steps']: + value = init_params['hyperparameters'].pop(argument, None) + if value is not None: + init_params[argument] = value - framework, py_version = framework_name_from_image(image) - init_params.update({'py_version': py_version}) - training_job_name = init_params['base_job_name'] + framework, py_version = framework_name_from_image(init_params.pop('image')) + init_params['py_version'] = py_version + training_job_name = init_params['base_job_name'] if framework != cls.__framework_name__: raise ValueError("Training job: {} didn't use image for requested framework".format(training_job_name)) - return super(TensorFlow, cls)._from_training_job(init_params, hyperparameters, image, sagemaker_session) + return init_params def train_image(self): """Return the Docker image to use for training. diff --git a/tests/integ/test_byo_estimator.py b/tests/integ/test_byo_estimator.py index eb6840705d..71f3c86862 100644 --- a/tests/integ/test_byo_estimator.py +++ b/tests/integ/test_byo_estimator.py @@ -154,3 +154,5 @@ def test_async_byo_estimator(): assert len(result['predictions']) == 10 for prediction in result['predictions']: assert prediction['score'] is not None + + assert estimator.train_image() == image_name diff --git a/tests/integ/test_tf.py b/tests/integ/test_tf.py index 9f570856e4..dc3c13cc96 100644 --- a/tests/integ/test_tf.py +++ b/tests/integ/test_tf.py @@ -28,7 +28,6 @@ def sagemaker_session(): return Session(boto_session=boto3.Session(region_name=REGION)) -@pytest.mark.skip(reason="Not Today") def test_tf(sagemaker_session): with timeout(minutes=15): script_path = os.path.join(DATA_DIR, 'iris', 'iris-dnn-classifier.py') diff --git a/tests/unit/test_estimator.py b/tests/unit/test_estimator.py index eb999c4a18..7542cb6880 100644 --- a/tests/unit/test_estimator.py +++ b/tests/unit/test_estimator.py @@ -70,6 +70,12 @@ def train_image(self): def create_model(self): return DummyFrameworkModel(self.sagemaker_session) + @classmethod + def _prepare_init_params_from_job_description(cls, job_details): + init_params = super(DummyFramework, cls)._prepare_init_params_from_job_description(job_details) + init_params.pop("image", None) + return init_params + class DummyFrameworkModel(FrameworkModel): @@ -251,12 +257,6 @@ def test_attach_framework(sagemaker_session): assert framework_estimator.entry_point == 'iris-dnn-classifier.py' -def test_attach_no_job_name_framework(sagemaker_session): - with pytest.raises(ValueError) as error: - Framework.attach(training_job_name=None, sagemaker_session=sagemaker_session) - assert 'must specify training_job name' in str(error) - - def test_fit_then_fit_again(sagemaker_session): fw = DummyFramework(entry_point=SCRIPT_PATH, role=ROLE, sagemaker_session=sagemaker_session, train_instance_count=INSTANCE_COUNT, train_instance_type=INSTANCE_TYPE, diff --git a/tests/unit/test_mxnet.py b/tests/unit/test_mxnet.py index 1f09b54849..a325facb48 100644 --- a/tests/unit/test_mxnet.py +++ b/tests/unit/test_mxnet.py @@ -201,9 +201,3 @@ def test_attach_wrong_framework(sagemaker_session): with pytest.raises(ValueError) as error: MXNet.attach(training_job_name='neo', sagemaker_session=sagemaker_session) assert "didn't use image for requested framework" in str(error) - - -def test_attach_no_job_name(sagemaker_session): - with pytest.raises(ValueError) as error: - MXNet.attach(training_job_name=None, sagemaker_session=sagemaker_session) - assert "must specify training_job name" in str(error) diff --git a/tests/unit/test_tf_estimator.py b/tests/unit/test_tf_estimator.py index dd73ac293b..367e66da65 100644 --- a/tests/unit/test_tf_estimator.py +++ b/tests/unit/test_tf_estimator.py @@ -379,9 +379,3 @@ def test_attach_wrong_framework(sagemaker_session): with pytest.raises(ValueError) as error: TensorFlow.attach(training_job_name='neo', sagemaker_session=sagemaker_session) assert "didn't use image for requested framework" in str(error) - - -def test_attach_no_job_name(sagemaker_session): - with pytest.raises(ValueError) as error: - TensorFlow.attach(training_job_name=None, sagemaker_session=sagemaker_session) - assert "must specify training_job name" in str(error)