spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")Ask-and-tell: How to use Optuna with Spark
Optuna is a hyperparameter optimisation framework that implements a variety of efficient sampling algorithms. Although using Spark to distribute the optimisation task is not part of Optuna’s API, it’s relatively straightforward to do batch optimisation using the ‘ask-and-tell interface’. The Hyperopt Python package does have Spark support but it also requires Hyperopt to be installed on the Spark executors. Moreover, Optuna has some nice features that Hyperopt does not have. In this blogpost I show how to do use Optuna with Spark.
First, we create a spark session and load the California housing dataset from scikit-learn that we will use for demonstation. To speed things up, we take a sample of 1,000 rows from this dataset.
from sklearn.datasets import fetch_california_housing
df = pd.concat(fetch_california_housing(as_frame=True, return_X_y=True), axis=1)
df = df.sample(n=1000, random_state=0)
target = 'MedHouseVal'
features = df.drop(columns=[target]).columns.to_list()We’re going to hyperparameter tune an XGBoost regression model. The hyperparameters that we tune are the learning rate, the maximum tree depth, the number of estimators and the L1 (alpha) en L2 (lambda) regularisation parameters. We create a function that takes a single set of hyperpameter values and that returns the mean of cross validation calculated scores. To calculate these, we use scikit-learn’s cross_validate function. By convention, ‘scores’ in scikit-learn require higher values to be better, so that’s why we need the negative version of the root mean squared error. This function will be run on the executors of the Spark cluster. Spark returns the results not necessarily in the order we used for the parameters. For Optuna it’s crucial that the right value is returned with the hyperparameters that created this value. To make sure the trial number and the objective value ‘stay together’, we provide the trial number as argument in our objective function and return it together with the objective value.
from xgboost import XGBRegressor
from sklearn.model_selection import cross_validate
def calculate_objective(trial_number, learning_rate, max_depth, n_estimators, reg_alpha, reg_lambda):
estimator = XGBRegressor(learning_rate=learning_rate,
max_depth=max_depth,
n_estimators=n_estimators,
reg_alpha=reg_alpha,
reg_lambda=reg_lambda)
result = cross_validate(estimator,
df[features],
df[target],
scoring='neg_root_mean_squared_error')
return trial_number, result['test_score'].mean()Now we need to define the search space of the hyperparameters and parallelise the calculation. The Tree-structured Parzen Estimator is Optuna’s default sampler. A good exlanation on this algorithm can be found here. The first step in this algorithm is random sampling hyperparameter combinations and calculating the cost function for each. Setting the number of randomly sampled hyperparameters to a large value reduces the probability of ending up in a local optimum instead of the desired global optimum. As I’m running this locally, the ‘cluster’ contains only 8 executors so we set the batch size to 8. Let’s say we want to evaluate 50 batches of which the first 20 are to ‘startup’; these are randomly drawn hyperparameter combinations. As we use the negative mean squared error as cost function, we need to tell Optuna to maximize its value. Optuna documentation advises to set constant_liar to True for batch optimisation. This will avoid multiple executors to evaluate very similar hyperparameter combinations.
We provide all random hyperparameter combinations at once to the Spark cluster and not in batches of 8. This way, we don’t need to wait for the slowest of the 8 evaluations to be finished before continuing with the next set; all executors will be used until all random hyperparameter combinations are evaluated. To get hyperparameter combinations to be evaluated, the Study instance has an ask method. The result should be provided to the Study instance using the tell method. After the startup trials, we ask for new hyperparameter sets after each batch of 8 evaluations in order take advantage of the Optuna’s optimisation algorithm.
def objective_spark(trials):
params = [[t.number,
t.suggest_float("learning_rate", 0.01, 0.5),
t.suggest_int("max_depth", 1, 100),
t.suggest_int("n_estimators", 2, 1000),
t.suggest_float("reg_alpha", 0, 10),
t.suggest_float("reg_lambda", 0, 10),
] for t in trials]
rdd = spark.sparkContext.parallelize(params, len(params))
rdd_map_result = rdd.map(lambda p: calculate_objective(*p)).collect()
return rdd_map_resultn_batches = 50
batch_size = 8
n_startup_batches = 20
n_startup_trials = n_startup_batches*batch_size
study = optuna.create_study(
sampler=optuna.samplers.TPESampler(n_startup_trials=n_startup_trials,
constant_liar=True, multivariate=True),
direction='maximize')
# First get results for random hyperparameter combinations
startup_trials = [study.ask() for _ in range(n_startup_trials)]
startup_results = objective_spark(startup_trials)
for startup_trial_number, startup_result in startup_results:
study.tell(startup_trial_number, startup_result)
print("After random parameter combinations:")
print("Best value:", study.best_value, 'Best parameters:', study.best_params)
# Let the Tree-structured Parzen Estimator come up with combinations to try
for j in range(n_startup_batches, n_batches):
trials = [study.ask() for _ in range(batch_size)]
batch_results = objective_spark(trials)
for trial_number, batch_result in batch_results:
study.tell(trial_number, batch_result)
print("Batch", j, "Best value:", study.best_value, 'Best parameters:', study.best_params)After random parameter combinations:
Best value: -0.5906094157978072 Best parameters: {'learning_rate': 0.19224256318198643, 'max_depth': 5, 'n_estimators': 636, 'reg_alpha': 1.6350036473663565, 'reg_lambda': 0.7751184568778258}
Batch 20 Best value: -0.5906094157978072 Best parameters: {'learning_rate': 0.19224256318198643, 'max_depth': 5, 'n_estimators': 636, 'reg_alpha': 1.6350036473663565, 'reg_lambda': 0.7751184568778258}
Batch 21 Best value: -0.5906094157978072 Best parameters: {'learning_rate': 0.19224256318198643, 'max_depth': 5, 'n_estimators': 636, 'reg_alpha': 1.6350036473663565, 'reg_lambda': 0.7751184568778258}
Batch 22 Best value: -0.58633312699559 Best parameters: {'learning_rate': 0.1326536317260198, 'max_depth': 5, 'n_estimators': 577, 'reg_alpha': 0.846742889757438, 'reg_lambda': 6.19997694747024}
Batch 23 Best value: -0.58633312699559 Best parameters: {'learning_rate': 0.1326536317260198, 'max_depth': 5, 'n_estimators': 577, 'reg_alpha': 0.846742889757438, 'reg_lambda': 6.19997694747024}
Batch 24 Best value: -0.58633312699559 Best parameters: {'learning_rate': 0.1326536317260198, 'max_depth': 5, 'n_estimators': 577, 'reg_alpha': 0.846742889757438, 'reg_lambda': 6.19997694747024}
Batch 25 Best value: -0.58633312699559 Best parameters: {'learning_rate': 0.1326536317260198, 'max_depth': 5, 'n_estimators': 577, 'reg_alpha': 0.846742889757438, 'reg_lambda': 6.19997694747024}
Batch 26 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 27 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 28 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 29 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 30 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 31 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 32 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 33 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 34 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 35 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 36 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 37 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 38 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 39 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 40 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 41 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 42 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 43 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 44 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 45 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 46 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 47 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 48 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Batch 49 Best value: -0.5757792478770037 Best parameters: {'learning_rate': 0.09846711112129035, 'max_depth': 4, 'n_estimators': 627, 'reg_alpha': 2.5413227489388426, 'reg_lambda': 6.7605526337086435}
Let’s now collect all results in a Pandas dataframe and investigate the results in a parallel coordinates plot.
result_df = pd.DataFrame([x.params for x in study.get_trials()])
result_df['test_score'] = [x.values[0] for x in study.get_trials()]
result_df['iteration'] = [x.number for x in study.get_trials()]
result_df = result_df.sort_values('test_score', ascending=False)| learning_rate | max_depth | n_estimators | reg_alpha | reg_lambda | test_score | iteration |
|---|---|---|---|---|---|---|
| 0.098467 | 4 | 627 | 2.541323 | 6.760553 | -0.575779 | 211 |
| 0.162134 | 2 | 423 | 0.071559 | 2.037565 | -0.577616 | 229 |
| 0.199394 | 3 | 275 | 0.032029 | 1.919793 | -0.579152 | 330 |
| 0.140988 | 2 | 501 | 2.361689 | 8.938440 | -0.579565 | 348 |
| 0.143023 | 2 | 445 | 1.906176 | 0.649088 | -0.580682 | 395 |
import plotly.express as pxpx.parallel_coordinates(result_df[['test_score', 'learning_rate',
'n_estimators', 'max_depth',
'reg_alpha', 'reg_lambda']],
color='test_score')When we include the iteration number in the parallel coordinates plot, we can confirm that the first 20*8=160 iterations were totally random and nicely covered the whole parameter space. Afterwards we see how the algorithm converges towards the optimal hyperparameter combination. The plotly parallel coordinates plot allows for highlighting a subset of values by selecting ranges on the individual axes. I show this in the animation below.
px.parallel_coordinates(result_df[['iteration', 'learning_rate',
'n_estimators', 'max_depth',
'reg_alpha', 'reg_lambda']],
color='iteration')