|
| 1 | +# Pipeline API for the AutoML-Toolkit |
| 2 | + |
| 3 | +The AutoML-Toolkit is an automated ML solution for Apache Spark. It provides common data cleansing and feature |
| 4 | +engineering support, automated hyper-parameter tuning through distributed genetic algorithms, and model tracking |
| 5 | +integration with MLFlow. It currently supports Supervised Learning algorithms that are provided as part of Spark Mllib. |
| 6 | + |
| 7 | +## General Overview |
| 8 | + |
| 9 | +The AutoML toolkit exposes the following pipeline-related APIs via [FamilyRunner](src/main/scala/com/databricks/labs/automl/executor/FamilyRunner.scala) |
| 10 | + |
| 11 | +#### [Inference using PipelineModel](#full-predict-pipeline-api) | [Inference using MLflow Run ID](#running-inference-pipeline-directly-against-an-mlflow-run-id-since-v061) |
| 12 | + |
| 13 | +### Full Predict pipeline API: |
| 14 | +```text |
| 15 | +executeWithPipeline() |
| 16 | +``` |
| 17 | +This pipeline API works with the existing configuration object (and overrides) as listed [here](APIDOCS.md), |
| 18 | +but it returns the following output |
| 19 | +```text |
| 20 | +FamilyFinalOutputWithPipeline( |
| 21 | + familyFinalOutput: FamilyFinalOutput, |
| 22 | + bestPipelineModel: Map[String, PipelineModel] |
| 23 | +) |
| 24 | +``` |
| 25 | +As noted, ```bestPipelineModel``` contains a key, value pair of a model family |
| 26 | +and the best pipeline model (based on the selected ```scoringOptimizationStrategy```) |
| 27 | + |
| 28 | +Example: |
| 29 | +```scala |
| 30 | +import com.databricks.labs.automl.executor.config.ConfigurationGenerator |
| 31 | +import com.databricks.labs.automl.executor.FamilyRunner |
| 32 | +import org.apache.spark.ml.PipelineModel |
| 33 | + |
| 34 | +val data = spark.table("ben_demo.adult_data") |
| 35 | +val overrides = Map( |
| 36 | + "labelCol" -> "label", "mlFlowLoggingFlag" -> false, |
| 37 | + "scalingFlag" -> true, "oneHotEncodeFlag" -> true, |
| 38 | + "pipelineDebugFlag" -> true |
| 39 | +) |
| 40 | +val randomForestConfig = ConfigurationGenerator |
| 41 | + .generateConfigFromMap("RandomForest", "classifier", overrides) |
| 42 | +val runner = FamilyRunner(data, Array(randomForestConfig)) |
| 43 | + .executeWithPipeline() |
| 44 | + |
| 45 | +runner.bestPipelineModel("RandomForest").transform(data) |
| 46 | + |
| 47 | +//Serialize it |
| 48 | +runner.bestPipelineModel("RandomForest").write.overwrite().save("tmp/predict-pipeline-1") |
| 49 | + |
| 50 | +// Load it for running inference |
| 51 | +val pipelineModel = PipelineModel.load("tmp/predict-pipeline-1") |
| 52 | +val predictDf = pipelineModel.transform(data) |
| 53 | +``` |
| 54 | + |
| 55 | + |
| 56 | +### Feature engineering pipeline API: |
| 57 | +```text |
| 58 | +generateFeatureEngineeredPipeline(verbose: Boolean = false) |
| 59 | +``` |
| 60 | +@param ```verbose```: If set to true, any dataset transformed with this feature engineered pipeline will include all |
| 61 | + input columns for the vector assembler stage |
| 62 | + |
| 63 | +This API builds a feature engineering pipeline based on the existing configuration object (and overrides) |
| 64 | +as listed [here](APIDOCS.md). It returns back the output of type ```Map[String, PipelineModel]``` where ```(key -> value)``` are |
| 65 | +```(modelFamilyName -> featureEngPipelineModel)``` |
| 66 | + |
| 67 | +Example: |
| 68 | +```scala |
| 69 | +import com.databricks.labs.automl.executor.config.ConfigurationGenerator |
| 70 | +import com.databricks.labs.automl.executor.FamilyRunner |
| 71 | +import org.apache.spark.ml.PipelineModel |
| 72 | + |
| 73 | +val data = spark.table("ben_demo.adult_data") |
| 74 | +val overrides = Map( |
| 75 | + "labelCol" -> "label", "mlFlowLoggingFlag" -> false, |
| 76 | + "scalingFlag" -> true, "oneHotEncodeFlag" -> true, |
| 77 | + "pipelineDebugFlag" -> true |
| 78 | +) |
| 79 | +val randomForestConfig = ConfigurationGenerator.generateConfigFromMap("RandomForest", "classifier", overrides) |
| 80 | +val runner = FamilyRunner(data, Array(randomForestConfig)) |
| 81 | + .generateFeatureEngineeredPipeline(verbose = true) |
| 82 | + |
| 83 | +runner("RandomForest") |
| 84 | +.write |
| 85 | +.overwrite() |
| 86 | +.save("tmp/feat-eng-pipeline-1") |
| 87 | + |
| 88 | +val featEngDf = PipelineModel |
| 89 | +.load("tmp/feat-eng-pipeline-1") |
| 90 | +.transform(data) |
| 91 | +``` |
| 92 | + |
| 93 | +### Running Inference Pipeline directly against an MLflow RUN ID since v0.6.1: |
| 94 | +With this release, it is now possible to run inference given a Mlflow RUN ID, |
| 95 | +since pipeline API now automatically registers inference pipeline model with Mlflow along with |
| 96 | +a bunch of other useful information, such as pipeline execution progress and each Pipeline |
| 97 | +stage transformation. This can come very handy to view the train pipeline's progress |
| 98 | +as well as troubleshooting. |
| 99 | + |
| 100 | +<details> |
| 101 | + <summary>Example of Pipeline Tags registered with Mlflow</summary> |
| 102 | + |
| 103 | + ## Heading |
| 104 | + An example of pipeline tags in Mlflow |
| 105 | +  |
| 106 | + |
| 107 | + And one of the transformations in a pipeline |
| 108 | +  |
| 109 | +</details> |
| 110 | + |
| 111 | +#### Example (As of 0.7.1) |
| 112 | +##### Model Pipeline MUST be trained/tuned using 0.7.1+ |
| 113 | +As of 0.7.1, the API ensures that data scientists can very easily send model to data engineering for production with |
| 114 | +only an MLFlow Run ID. This is made possible by the addition of the full main config tracked in MLFlow. |
| 115 | +This greatly simplifies the Inference Pipeline but it also enables config tracking and verification much easier. |
| 116 | +When `mlFlowLoggingFlag` is `true` the config is tracked on every model tracked as per |
| 117 | +[mlFlowLoggingMode](APIDOCS.md#mlflow-logging-mode). |
| 118 | + |
| 119 | + |
| 120 | +Most teams follow the process: |
| 121 | +* Data Science |
| 122 | + * Iterative training, testing, validation, review, tracking |
| 123 | + * Identification of model to move to production |
| 124 | + * Submit ticket to Data Engineering with MLFlow RunID to productionize model |
| 125 | + |
| 126 | +* Data Engineering |
| 127 | + * Productionize Model |
| 128 | + |
| 129 | +Below is a full pipeline example |
| 130 | +```scala |
| 131 | +// Data Science |
| 132 | +val data = spark.table("my_database.myTrain_Data") |
| 133 | +val overrides = Map( |
| 134 | + "labelCol" -> "label", "mlFlowLoggingFlag" -> true, |
| 135 | + "scalingFlag" -> true, "oneHotEncodeFlag" -> true |
| 136 | +) |
| 137 | +val randomForestConfig = ConfigurationGenerator.generateConfigFromMap("RandomForest", "classifier", overrides) |
| 138 | +val runner = FamilyRunner(data, Array(randomForestConfig)).executeWithPipeline() |
| 139 | + |
| 140 | +// Data Engineering |
| 141 | +val pipelineBest = PipelineModelInference.getPipelineModelByMlFlowRunId("111825a540544443b9db14e5b9a6006b") |
| 142 | +val prediction = pipelineBest.transform(spark.read.format("delta").load("dbfs:/.../myDataForInference")) |
| 143 | + .withColumn("priceReal", exp(col("price"))).withColumn("prediction", exp(col("prediction"))) |
| 144 | +prediction.write.format("delta").saveAsTable("my_database.newestPredictions") |
| 145 | +``` |
| 146 | + |
| 147 | +MLFLow_Config_Tracking.png |
| 148 | + |
| 149 | +#### Example (Deprecated as of 0.7.1): |
| 150 | +```scala |
| 151 | +import com.databricks.labs.automl.executor.config.ConfigurationGenerator |
| 152 | +import com.databricks.labs.automl.executor.FamilyRunner |
| 153 | +import org.apache.spark.ml.PipelineModel |
| 154 | +import com.databricks.labs.automl.pipeline.inference.PipelineModelInference |
| 155 | + |
| 156 | +val data = spark.table("ben_demo.adult_data") |
| 157 | +val overrides = Map( |
| 158 | + "labelCol" -> "label", "mlFlowLoggingFlag" -> true, |
| 159 | + "scalingFlag" -> true, "oneHotEncodeFlag" -> true, |
| 160 | + "pipelineDebugFlag" -> true |
| 161 | +) |
| 162 | +val randomForestConfig = ConfigurationGenerator |
| 163 | + .generateConfigFromMap("RandomForest", "classifier", overrides) |
| 164 | +val runner = FamilyRunner(data, Array(randomForestConfig)) |
| 165 | + .executeWithPipeline() |
| 166 | + |
| 167 | +val mlFlowRunId = runner.bestMlFlowRunId("RandomForest") |
| 168 | + |
| 169 | +val loggingConfig = randomForestConfig.loggingConfig |
| 170 | +val pipelineModel = PipelineModelInference.getPipelineModelByMlFlowRunId(mlFlowRunId, loggingConfig) |
| 171 | +pipelineModel.transform(data.drop("label")).drop("features").show(10) |
| 172 | +``` |
| 173 | + |
| 174 | +### Pipeline Configurations |
| 175 | +As noted above, all the pipeline APIs will work with the existing configuration objects. In addition to those, pipeline API |
| 176 | +exposes the following configurations: |
| 177 | + |
| 178 | +```@text |
| 179 | +default: false |
| 180 | +pipelineDebugFlag: A Boolean flag for the pipeline logging purposes. When turned on, each stage in a pipeline execution |
| 181 | +will print and log out a lot of useful information that can be used to track transformations for debugging/troubleshooting |
| 182 | +puproses. Since v0.6.1, when this flag is turned on, pipeline reports all of these transformations to Mlflow as Run tags. |
| 183 | +``` |
| 184 | + |
| 185 | + |
0 commit comments