bookmark_border
A tutorial to train a TensorFlow Recommenders ranking model as a TFX pipeline.
In this notebook-based tutorial, we will create and run a TFX pipeline to train a ranking model to predict movie ratings using TensorFlow Recommenders (TFRS). The pipeline will consist of three essential TFX components: ExampleGen, Trainer and Pusher. The pipeline includes the most minimal ML workflow like importing data, training a model and exporting the trained TFRS ranking model.
We first need to install the TFX Python package and download the dataset which we will use for our model.
To avoid upgrading Pip in a system when running locally, check to make sure that we are running in Colab. Local systems can of course be upgraded separately.
import sys
if 'google.colab' in sys.modules:
!pip install --upgrade pip
pip install -U tfxpip install -U tensorflow-recommenders
If you are using Google Colab, the first time that you run the cell above, you must restart the runtime by clicking above “RESTART RUNTIME” button or using “Runtime > Restart runtime …” menu. This is because of the way that Colab loads packages.
Before we define the pipeline, we need to write the model code for the Trainer component and save it in a file.
Check the TensorFlow and TFX versions.
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
2022-12-14 13:03:42.352068: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered 2022-12-14 13:03:43.224089: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory 2022-12-14 13:03:43.224183: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory 2022-12-14 13:03:43.224193: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly. TensorFlow version: 2.10.1 TFX version: 1.11.0
There are some variables used to define a pipeline. You can customize these variables as you want. By default all output from the pipeline will be generated under the current directory. Instead of using the SchemaGen component to generate a schema, for this tutorial we will create a hardcoded schema.
import os
PIPELINE_NAME = 'TFRS-ranking'
# Directory where MovieLens 100K rating data lives
DATA_ROOT = os.path.join('data', PIPELINE_NAME)
# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)
from absl import logging
logging.set_verbosity(logging.INFO) # Set default logging level.
Since TFX does not currently support TensorFlow Datasets API, we will download the MovieLens 100K dataset manually for use in our TFX pipeline. The dataset we are using is MovieLens 100K Dataset.
There are four numeric features in this dataset:
We will build a ranking model which predicts the rating of the movies. We will not use the timestamp feature.
Because TFX ExampleGen reads inputs from a directory, we need to create a directory and copy dataset to it.
wget https://files.grouplens.org/datasets/movielens/ml-100k.zipmkdir -p {DATA_ROOT}unzip ml-100k.zipecho 'userId,movieId,rating,timestamp' > {DATA_ROOT}/ratings.csvsed 's/\t/,/g' ml-100k/u.data >> {DATA_ROOT}/ratings.csv
--2022-12-14 13:03:46-- https://files.grouplens.org/datasets/movielens/ml-100k.zip Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152 Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 4924029 (4.7M) [application/zip] Saving to: ‘ml-100k.zip’ ml-100k.zip 100%[===================>] 4.70M 31.0MB/s in 0.2s 2022-12-14 13:03:47 (31.0 MB/s) - ‘ml-100k.zip’ saved [4924029/4924029] Archive: ml-100k.zip creating: ml-100k/ inflating: ml-100k/allbut.pl inflating: ml-100k/mku.sh inflating: ml-100k/README inflating: ml-100k/u.data inflating: ml-100k/u.genre inflating: ml-100k/u.info inflating: ml-100k/u.item inflating: ml-100k/u.occupation inflating: ml-100k/u.user inflating: ml-100k/u1.base inflating: ml-100k/u1.test inflating: ml-100k/u2.base inflating: ml-100k/u2.test inflating: ml-100k/u3.base inflating: ml-100k/u3.test inflating: ml-100k/u4.base inflating: ml-100k/u4.test inflating: ml-100k/u5.base inflating: ml-100k/u5.test inflating: ml-100k/ua.base inflating: ml-100k/ua.test inflating: ml-100k/ub.base inflating: ml-100k/ub.test
Take a quick look at the CSV file.
head {DATA_ROOT}/ratings.csv
userId,movieId,rating,timestamp 196,242,3,881250949 186,302,3,891717742 22,377,1,878887116 244,51,2,880606923 166,346,1,886397596 298,474,4,884182806 115,265,2,881171488 253,465,5,891628467 305,451,3,886324817
You should be able to see four values. For example, the first example means user ‘196’ gives a rating of 3 to movie ‘242’.
TFX pipelines are defined using Python APIs. We will define a pipeline which consists of following three components.
Before actually define the pipeline, we need to write a model code for the Trainer component first.
We will build a simple ranking model to predict movie ratings. This model training code will be saved to a separate file.
In this tutorial we will use Generic Trainer of TFX which support Keras-based models. You need to write a Python file containing run_fn function, which is the entrypoint for the Trainer component.
_trainer_module_file = 'tfrs_ranking_trainer.py'
The ranking model we use is almost exactly the same as in the Basic Ranking tutorial. The only difference is that we use movie IDs instead of movie titles in the candidate tower.
%%writefile {_trainer_module_file}
from typing import Dict, Text
from typing import List
import numpy as np
import tensorflow as tf
from tensorflow_metadata.proto.v0 import schema_pb2
import tensorflow_recommenders as tfrs
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
_FEATURE_KEYS = ['userId', 'movieId']
_LABEL_KEY = 'rating'
_FEATURE_SPEC = {
**{
feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
for feature in _FEATURE_KEYS
}, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}
class RankingModel(tf.keras.Model):
def __init__(self):
super().__init__()
embedding_dimension = 32
unique_user_ids = np.array(range(943)).astype(str)
unique_movie_ids = np.array(range(1682)).astype(str)
# Compute embeddings for users.
self.user_embeddings = tf.keras.Sequential([
tf.keras.layers.Input(shape=(1,), name='userId', dtype=tf.int64),
tf.keras.layers.Lambda(lambda x: tf.as_string(x)),
tf.keras.layers.StringLookup(
vocabulary=unique_user_ids, mask_token=None),
tf.keras.layers.Embedding(
len(unique_user_ids) + 1, embedding_dimension)
])
# Compute embeddings for movies.
self.movie_embeddings = tf.keras.Sequential([
tf.keras.layers.Input(shape=(1,), name='movieId', dtype=tf.int64),
tf.keras.layers.Lambda(lambda x: tf.as_string(x)),
tf.keras.layers.StringLookup(
vocabulary=unique_movie_ids, mask_token=None),
tf.keras.layers.Embedding(
len(unique_movie_ids) + 1, embedding_dimension)
])
# Compute predictions.
self.ratings = tf.keras.Sequential([
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(1)
])
def call(self, inputs):
user_id, movie_id = inputs
user_embedding = self.user_embeddings(user_id)
movie_embedding = self.movie_embeddings(movie_id)
return self.ratings(tf.concat([user_embedding, movie_embedding], axis=2))
class MovielensModel(tfrs.models.Model):
def __init__(self):
super().__init__()
self.ranking_model: tf.keras.Model = RankingModel()
self.task: tf.keras.layers.Layer = tfrs.tasks.Ranking(
loss=tf.keras.losses.MeanSquaredError(),
metrics=[tf.keras.metrics.RootMeanSquaredError()])
def call(self, features: Dict[str, tf.Tensor]) -> tf.Tensor:
return self.ranking_model((features['userId'], features['movieId']))
def compute_loss(self,
features: Dict[Text, tf.Tensor],
training=False) -> tf.Tensor:
labels = features[1]
rating_predictions = self(features[0])
# The task computes the loss and the metrics.
return self.task(labels=labels, predictions=rating_predictions)
def _input_fn(file_pattern: List[str],
data_accessor: tfx.components.DataAccessor,
schema: schema_pb2.Schema,
batch_size: int = 256) -> tf.data.Dataset:
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=batch_size, label_key=_LABEL_KEY),
schema=schema).repeat()
def _build_keras_model() -> tf.keras.Model:
return MovielensModel()
# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)
train_dataset = _input_fn(
fn_args.train_files, fn_args.data_accessor, schema, batch_size=8192)
eval_dataset = _input_fn(
fn_args.eval_files, fn_args.data_accessor, schema, batch_size=4096)
model = _build_keras_model()
model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.1))
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
epochs = 3,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
model.save(fn_args.serving_model_dir)
Writing tfrs_ranking_trainer.py
Now you have completed all preparation steps to build the TFX pipeline.
We define a function to create a TFX pipeline. A Pipeline object represents a TFX pipeline which can be run using one of pipeline orchestration systems that TFX supports.
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
module_file: str, serving_model_dir: str,
metadata_path: str) -> tfx.dsl.Pipeline:
"""Creates a three component pipeline with TFX."""
# Brings data into the pipeline.
example_gen = tfx.components.CsvExampleGen(input_base=data_root)
# Uses user-provided Python function that trains a model.
trainer = tfx.components.Trainer(
module_file=module_file,
examples=example_gen.outputs['examples'],
train_args=tfx.proto.TrainArgs(num_steps=12),
eval_args=tfx.proto.EvalArgs(num_steps=24))
# Pushes the model to a filesystem destination.
pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=serving_model_dir)))
# Following three components will be included in the pipeline.
components = [
example_gen,
trainer,
pusher,
]
return tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
metadata_connection_config=tfx.orchestration.metadata
.sqlite_metadata_connection_config(metadata_path),
components=components)
TFX supports multiple orchestrators to run pipelines. In this tutorial we will use LocalDagRunner which is included in the TFX Python package and runs pipelines on local environment.
Now we create a LocalDagRunner and pass a Pipeline object created from the function we already defined.
The pipeline runs directly and you can see logs for the progress of the pipeline including ML model training.
tfx.orchestration.LocalDagRunner().run(
_create_pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
data_root=DATA_ROOT,
module_file=_trainer_module_file,
serving_model_dir=SERVING_MODEL_DIR,
metadata_path=METADATA_PATH))
INFO:absl:Generating ephemeral wheel package for '/tmpfs/src/temp/docs/examples/tfrs_ranking_trainer.py' (including modules: ['tfrs_ranking_trainer']).
INFO:absl:User module package has hash fingerprint version bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.
INFO:absl:Executing: ['/tmpfs/src/tf_docs_env/bin/python', '/tmpfs/tmp/tmpbks2_kfl/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmpfs/tmp/tmpbvugqpwn', '--dist-dir', '/tmpfs/tmp/tmpfqmceeau']
/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/setuptools/command/install.py:34: SetuptoolsDeprecationWarning: setup.py install is deprecated. Use build and pip and other standards-based tools.
warnings.warn(
INFO:absl:Successfully built user code wheel distribution at 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'; target user module is 'tfrs_ranking_trainer'.
INFO:absl:Full user module path is 'tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'
INFO:absl:Using deployment config:
executor_specs {
key: "CsvExampleGen"
value {
beam_executable_spec {
python_executor_spec {
class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
}
}
}
}
executor_specs {
key: "Pusher"
value {
python_class_executable_spec {
class_path: "tfx.components.pusher.executor.Executor"
}
}
}
executor_specs {
key: "Trainer"
value {
python_class_executable_spec {
class_path: "tfx.components.trainer.executor.GenericExecutor"
}
}
}
custom_driver_specs {
key: "CsvExampleGen"
value {
python_class_executable_spec {
class_path: "tfx.components.example_gen.driver.FileBasedDriver"
}
}
}
metadata_connection_config {
database_connection_config {
sqlite {
filename_uri: "metadata/TFRS-ranking/metadata.db"
connection_mode: READWRITE_OPENCREATE
}
}
}
INFO:absl:Using connection config:
sqlite {
filename_uri: "metadata/TFRS-ranking/metadata.db"
connection_mode: READWRITE_OPENCREATE
}
INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
type {
name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
}
id: "CsvExampleGen"
}
contexts {
contexts {
type {
name: "pipeline"
}
name {
field_value {
string_value: "TFRS-ranking"
}
}
}
contexts {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2022-12-14T13:03:48.504573"
}
}
}
contexts {
type {
name: "node"
}
name {
field_value {
string_value: "TFRS-ranking.CsvExampleGen"
}
}
}
}
outputs {
outputs {
key: "examples"
value {
artifact_spec {
type {
name: "Examples"
properties {
key: "span"
value: INT
}
properties {
key: "split_names"
value: STRING
}
properties {
key: "version"
value: INT
}
base_type: DATASET
}
}
}
}
}
parameters {
parameters {
key: "input_base"
value {
field_value {
string_value: "data/TFRS-ranking"
}
}
}
parameters {
key: "input_config"
value {
field_value {
string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}"
}
}
}
parameters {
key: "output_config"
value {
field_value {
string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}"
}
}
}
parameters {
key: "output_data_format"
value {
field_value {
int_value: 6
}
}
}
parameters {
key: "output_file_format"
value {
field_value {
int_value: 5
}
}
}
}
downstream_nodes: "Trainer"
execution_options {
caching_options {
}
}
running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying tfrs_ranking_trainer.py -> build/lib
installing to /tmpfs/tmp/tmpbvugqpwn
running install
running install_lib
copying build/lib/tfrs_ranking_trainer.py -> /tmpfs/tmp/tmpbvugqpwn
running install_egg_info
running egg_info
creating tfx_user_code_Trainer.egg-info
writing tfx_user_code_Trainer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
Copying tfx_user_code_Trainer.egg-info to /tmpfs/tmp/tmpbvugqpwn/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3.9.egg-info
running install_scripts
creating /tmpfs/tmp/tmpbvugqpwn/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/WHEEL
creating '/tmpfs/tmp/tmpfqmceeau/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl' and adding '/tmpfs/tmp/tmpbvugqpwn' to it
adding 'tfrs_ranking_trainer.py'
adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/METADATA'
adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/WHEEL'
adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/top_level.txt'
adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/RECORD'
removing /tmpfs/tmp/tmpbvugqpwn
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:[CsvExampleGen] Resolved inputs: ({},)
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 1
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=1, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/TFRS-ranking/CsvExampleGen/examples/1"
custom_properties {
key: "input_fingerprint"
value {
string_value: "split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027"
}
}
custom_properties {
key: "span"
value {
int_value: 0
}
}
, artifact_type: name: "Examples"
properties {
key: "span"
value: INT
}
properties {
key: "split_names"
value: STRING
}
properties {
key: "version"
value: INT
}
base_type: DATASET
)]}), exec_properties={'output_file_format': 5, 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'output_data_format': 6, 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'input_base': 'data/TFRS-ranking', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027'}, execution_output_uri='pipelines/TFRS-ranking/CsvExampleGen/.system/executor_execution/1/executor_output.pb', stateful_working_dir='pipelines/TFRS-ranking/CsvExampleGen/.system/stateful_working_dir/2022-12-14T13:03:48.504573', tmp_dir='pipelines/TFRS-ranking/CsvExampleGen/.system/executor_execution/1/.temp/', pipeline_node=node_info {
type {
name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
}
id: "CsvExampleGen"
}
contexts {
contexts {
type {
name: "pipeline"
}
name {
field_value {
string_value: "TFRS-ranking"
}
}
}
contexts {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2022-12-14T13:03:48.504573"
}
}
}
contexts {
type {
name: "node"
}
name {
field_value {
string_value: "TFRS-ranking.CsvExampleGen"
}
}
}
}
outputs {
outputs {
key: "examples"
value {
artifact_spec {
type {
name: "Examples"
properties {
key: "span"
value: INT
}
properties {
key: "split_names"
value: STRING
}
properties {
key: "version"
value: INT
}
base_type: DATASET
}
}
}
}
}
parameters {
parameters {
key: "input_base"
value {
field_value {
string_value: "data/TFRS-ranking"
}
}
}
parameters {
key: "input_config"
value {
field_value {
string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}"
}
}
}
parameters {
key: "output_config"
value {
field_value {
string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}"
}
}
}
parameters {
key: "output_data_format"
value {
field_value {
int_value: 6
}
}
}
parameters {
key: "output_file_format"
value {
field_value {
int_value: 5
}
}
}
}
downstream_nodes: "Trainer"
execution_options {
caching_options {
}
}
, pipeline_info=id: "TFRS-ranking"
, pipeline_run_id='2022-12-14T13:03:48.504573')
INFO:absl:Generating examples.
WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.
INFO:absl:Processing input csv data data/TFRS-ranking/* to TFExample.
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 1 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/TFRS-ranking/CsvExampleGen/examples/1"
custom_properties {
key: "input_fingerprint"
value {
string_value: "split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027"
}
}
custom_properties {
key: "span"
value {
int_value: 0
}
}
, artifact_type: name: "Examples"
properties {
key: "span"
value: INT
}
properties {
key: "split_names"
value: STRING
}
properties {
key: "version"
value: INT
}
base_type: DATASET
)]}) for execution 1
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component Trainer is running.
INFO:absl:Running launcher for node_info {
type {
name: "tfx.components.trainer.component.Trainer"
base_type: TRAIN
}
id: "Trainer"
}
contexts {
contexts {
type {
name: "pipeline"
}
name {
field_value {
string_value: "TFRS-ranking"
}
}
}
contexts {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2022-12-14T13:03:48.504573"
}
}
}
contexts {
type {
name: "node"
}
name {
field_value {
string_value: "TFRS-ranking.Trainer"
}
}
}
}
inputs {
inputs {
key: "examples"
value {
channels {
producer_node_query {
id: "CsvExampleGen"
}
context_queries {
type {
name: "pipeline"
}
name {
field_value {
string_value: "TFRS-ranking"
}
}
}
context_queries {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2022-12-14T13:03:48.504573"
}
}
}
context_queries {
type {
name: "node"
}
name {
field_value {
string_value: "TFRS-ranking.CsvExampleGen"
}
}
}
artifact_query {
type {
name: "Examples"
base_type: DATASET
}
}
output_key: "examples"
}
min_count: 1
}
}
}
outputs {
outputs {
key: "model"
value {
artifact_spec {
type {
name: "Model"
base_type: MODEL
}
}
}
}
outputs {
key: "model_run"
value {
artifact_spec {
type {
name: "ModelRun"
}
}
}
}
}
parameters {
parameters {
key: "custom_config"
value {
field_value {
string_value: "null"
}
}
}
parameters {
key: "eval_args"
value {
field_value {
string_value: "{\n \"num_steps\": 24\n}"
}
}
}
parameters {
key: "module_path"
value {
field_value {
string_value: "tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl"
}
}
}
parameters {
key: "train_args"
value {
field_value {
string_value: "{\n \"num_steps\": 12\n}"
}
}
}
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "Pusher"
execution_options {
caching_options {
}
}
INFO:absl:MetadataStore with DB connection initialized
WARNING:absl:ArtifactQuery.property_predicate is not supported.
INFO:absl:[Trainer] Resolved inputs: ({'examples': [Artifact(artifact: id: 1
type_id: 15
uri: "pipelines/TFRS-ranking/CsvExampleGen/examples/1"
properties {
key: "split_names"
value {
string_value: "[\"train\", \"eval\"]"
}
}
custom_properties {
key: "file_format"
value {
string_value: "tfrecords_gzip"
}
}
custom_properties {
key: "input_fingerprint"
value {
string_value: "split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027"
}
}
custom_properties {
key: "is_external"
value {
int_value: 0
}
}
custom_properties {
key: "payload_format"
value {
string_value: "FORMAT_TF_EXAMPLE"
}
}
custom_properties {
key: "span"
value {
int_value: 0
}
}
custom_properties {
key: "state"
value {
string_value: "published"
}
}
custom_properties {
key: "tfx_version"
value {
string_value: "1.11.0"
}
}
state: LIVE
create_time_since_epoch: 1671023043350
last_update_time_since_epoch: 1671023043350
, artifact_type: id: 15
name: "Examples"
properties {
key: "span"
value: INT
}
properties {
key: "split_names"
value: STRING
}
properties {
key: "version"
value: INT
}
base_type: DATASET
)]},)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 2
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=2, input_dict={'examples': [Artifact(artifact: id: 1
type_id: 15
uri: "pipelines/TFRS-ranking/CsvExampleGen/examples/1"
properties {
key: "split_names"
value {
string_value: "[\"train\", \"eval\"]"
}
}
custom_properties {
key: "file_format"
value {
string_value: "tfrecords_gzip"
}
}
custom_properties {
key: "input_fingerprint"
value {
string_value: "split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027"
}
}
custom_properties {
key: "is_external"
value {
int_value: 0
}
}
custom_properties {
key: "payload_format"
value {
string_value: "FORMAT_TF_EXAMPLE"
}
}
custom_properties {
key: "span"
value {
int_value: 0
}
}
custom_properties {
key: "state"
value {
string_value: "published"
}
}
custom_properties {
key: "tfx_version"
value {
string_value: "1.11.0"
}
}
state: LIVE
create_time_since_epoch: 1671023043350
last_update_time_since_epoch: 1671023043350
, artifact_type: id: 15
name: "Examples"
properties {
key: "span"
value: INT
}
properties {
key: "split_names"
value: STRING
}
properties {
key: "version"
value: INT
}
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model_run/2"
, artifact_type: name: "ModelRun"
)], 'model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model/2"
, artifact_type: name: "Model"
base_type: MODEL
)]}), exec_properties={'train_args': '{\n "num_steps": 12\n}', 'eval_args': '{\n "num_steps": 24\n}', 'custom_config': 'null', 'module_path': 'tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'}, execution_output_uri='pipelines/TFRS-ranking/Trainer/.system/executor_execution/2/executor_output.pb', stateful_working_dir='pipelines/TFRS-ranking/Trainer/.system/stateful_working_dir/2022-12-14T13:03:48.504573', tmp_dir='pipelines/TFRS-ranking/Trainer/.system/executor_execution/2/.temp/', pipeline_node=node_info {
type {
name: "tfx.components.trainer.component.Trainer"
base_type: TRAIN
}
id: "Trainer"
}
contexts {
contexts {
type {
name: "pipeline"
}
name {
field_value {
string_value: "TFRS-ranking"
}
}
}
contexts {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2022-12-14T13:03:48.504573"
}
}
}
contexts {
type {
name: "node"
}
name {
field_value {
string_value: "TFRS-ranking.Trainer"
}
}
}
}
inputs {
inputs {
key: "examples"
value {
channels {
producer_node_query {
id: "CsvExampleGen"
}
context_queries {
type {
name: "pipeline"
}
name {
field_value {
string_value: "TFRS-ranking"
}
}
}
context_queries {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2022-12-14T13:03:48.504573"
}
}
}
context_queries {
type {
name: "node"
}
name {
field_value {
string_value: "TFRS-ranking.CsvExampleGen"
}
}
}
artifact_query {
type {
name: "Examples"
base_type: DATASET
}
}
output_key: "examples"
}
min_count: 1
}
}
}
outputs {
outputs {
key: "model"
value {
artifact_spec {
type {
name: "Model"
base_type: MODEL
}
}
}
}
outputs {
key: "model_run"
value {
artifact_spec {
type {
name: "ModelRun"
}
}
}
}
}
parameters {
parameters {
key: "custom_config"
value {
field_value {
string_value: "null"
}
}
}
parameters {
key: "eval_args"
value {
field_value {
string_value: "{\n \"num_steps\": 24\n}"
}
}
}
parameters {
key: "module_path"
value {
field_value {
string_value: "tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl"
}
}
}
parameters {
key: "train_args"
value {
field_value {
string_value: "{\n \"num_steps\": 12\n}"
}
}
}
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "Pusher"
execution_options {
caching_options {
}
}
, pipeline_info=id: "TFRS-ranking"
, pipeline_run_id='2022-12-14T13:03:48.504573')
INFO:absl:Train on the 'train' split when train_args.splits is not set.
INFO:absl:Evaluate on the 'eval' split when eval_args.splits is not set.
INFO:absl:udf_utils.get_fn {'train_args': '{\n "num_steps": 12\n}', 'eval_args': '{\n "num_steps": 24\n}', 'custom_config': 'null', 'module_path': 'tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'} 'run_fn'
INFO:absl:Installing 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl' to a temporary directory.
INFO:absl:Executing: ['/tmpfs/src/tf_docs_env/bin/python', '-m', 'pip', 'install', '--target', '/tmpfs/tmp/tmpkwsj4cmv', 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl']
Processing ./pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl
INFO:absl:Successfully installed 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'.
INFO:absl:Training model.
INFO:absl:Feature movieId has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature rating has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature userId has a shape dim {
size: 1
}
. Setting to DenseTensor.
Installing collected packages: tfx-user-code-Trainer
Successfully installed tfx-user-code-Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c
INFO:absl:Feature movieId has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature rating has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature userId has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature movieId has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature rating has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature userId has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature movieId has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature rating has a shape dim {
size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature userId has a shape dim {
size: 1
}
. Setting to DenseTensor.
Epoch 1/3
12/12 [==============================] - 4s 247ms/step - root_mean_squared_error: 1.9485 - loss: 3.6361 - regularization_loss: 0.0000e+00 - total_loss: 3.6361 - val_root_mean_squared_error: 1.2136 - val_loss: 1.4467 - val_regularization_loss: 0.0000e+00 - val_total_loss: 1.4467
Epoch 2/3
12/12 [==============================] - 2s 198ms/step - root_mean_squared_error: 1.1657 - loss: 1.3525 - regularization_loss: 0.0000e+00 - total_loss: 1.3525 - val_root_mean_squared_error: 1.1173 - val_loss: 1.2615 - val_regularization_loss: 0.0000e+00 - val_total_loss: 1.2615
Epoch 3/3
12/12 [==============================] - 2s 212ms/step - root_mean_squared_error: 1.1121 - loss: 1.2330 - regularization_loss: 0.0000e+00 - total_loss: 1.2330 - val_root_mean_squared_error: 1.0983 - val_loss: 1.2303 - val_regularization_loss: 0.0000e+00 - val_total_loss: 1.2303
WARNING:absl:Function `_wrapped_model` contains input name(s) movieId, userId with unsupported characters which will be renamed to movieid, userid in the SavedModel.
WARNING:absl:Found untraced functions such as ranking_layer_call_fn, ranking_layer_call_and_return_conditional_losses while saving (showing 2 of 2). These functions will not be directly callable after loading.
INFO:tensorflow:Assets written to: pipelines/TFRS-ranking/Trainer/model/2/Format-Serving/assets
INFO:tensorflow:Assets written to: pipelines/TFRS-ranking/Trainer/model/2/Format-Serving/assets
INFO:absl:Training complete. Model written to pipelines/TFRS-ranking/Trainer/model/2/Format-Serving. ModelRun written to pipelines/TFRS-ranking/Trainer/model_run/2
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 2 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model_run/2"
, artifact_type: name: "ModelRun"
)], 'model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model/2"
, artifact_type: name: "Model"
base_type: MODEL
)]}) for execution 2
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component Trainer is finished.
INFO:absl:Component Pusher is running.
INFO:absl:Running launcher for node_info {
type {
name: "tfx.components.pusher.component.Pusher"
base_type: DEPLOY
}
id: "Pusher"
}
contexts {
contexts {
type {
name: "pipeline"
}
name {
field_value {
string_value: "TFRS-ranking"
}
}
}
contexts {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2022-12-14T13:03:48.504573"
}
}
}
contexts {
type {
name: "node"
}
name {
field_value {
string_value: "TFRS-ranking.Pusher"
}
}
}
}
inputs {
inputs {
key: "model"
value {
channels {
producer_node_query {
id: "Trainer"
}
context_queries {
type {
name: "pipeline"
}
name {
field_value {
string_value: "TFRS-ranking"
}
}
}
context_queries {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2022-12-14T13:03:48.504573"
}
}
}
context_queries {
type {
name: "node"
}
name {
field_value {
string_value: "TFRS-ranking.Trainer"
}
}
}
artifact_query {
type {
name: "Model"
base_type: MODEL
}
}
output_key: "model"
}
}
}
}
outputs {
outputs {
key: "pushed_model"
value {
artifact_spec {
type {
name: "PushedModel"
base_type: MODEL
}
}
}
}
}
parameters {
parameters {
key: "custom_config"
value {
field_value {
string_value: "null"
}
}
}
parameters {
key: "push_destination"
value {
field_value {
string_value: "{\n \"filesystem\": {\n \"base_directory\": \"serving_model/TFRS-ranking\"\n }\n}"
}
}
}
}
upstream_nodes: "Trainer"
execution_options {
caching_options {
}
}
INFO:absl:MetadataStore with DB connection initialized
WARNING:absl:ArtifactQuery.property_predicate is not supported.
INFO:absl:[Pusher] Resolved inputs: ({'model': [Artifact(artifact: id: 3
type_id: 18
uri: "pipelines/TFRS-ranking/Trainer/model/2"
custom_properties {
key: "is_external"
value {
int_value: 0
}
}
custom_properties {
key: "state"
value {
string_value: "published"
}
}
custom_properties {
key: "tfx_version"
value {
string_value: "1.11.0"
}
}
state: LIVE
create_time_since_epoch: 1671023059840
last_update_time_since_epoch: 1671023059840
, artifact_type: id: 18
name: "Model"
base_type: MODEL
)]},)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 3
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=3, input_dict={'model': [Artifact(artifact: id: 3
type_id: 18
uri: "pipelines/TFRS-ranking/Trainer/model/2"
custom_properties {
key: "is_external"
value {
int_value: 0
}
}
custom_properties {
key: "state"
value {
string_value: "published"
}
}
custom_properties {
key: "tfx_version"
value {
string_value: "1.11.0"
}
}
state: LIVE
create_time_since_epoch: 1671023059840
last_update_time_since_epoch: 1671023059840
, artifact_type: id: 18
name: "Model"
base_type: MODEL
)]}, output_dict=defaultdict(<class 'list'>, {'pushed_model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Pusher/pushed_model/3"
, artifact_type: name: "PushedModel"
base_type: MODEL
)]}), exec_properties={'push_destination': '{\n "filesystem": {\n "base_directory": "serving_model/TFRS-ranking"\n }\n}', 'custom_config': 'null'}, execution_output_uri='pipelines/TFRS-ranking/Pusher/.system/executor_execution/3/executor_output.pb', stateful_working_dir='pipelines/TFRS-ranking/Pusher/.system/stateful_working_dir/2022-12-14T13:03:48.504573', tmp_dir='pipelines/TFRS-ranking/Pusher/.system/executor_execution/3/.temp/', pipeline_node=node_info {
type {
name: "tfx.components.pusher.component.Pusher"
base_type: DEPLOY
}
id: "Pusher"
}
contexts {
contexts {
type {
name: "pipeline"
}
name {
field_value {
string_value: "TFRS-ranking"
}
}
}
contexts {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2022-12-14T13:03:48.504573"
}
}
}
contexts {
type {
name: "node"
}
name {
field_value {
string_value: "TFRS-ranking.Pusher"
}
}
}
}
inputs {
inputs {
key: "model"
value {
channels {
producer_node_query {
id: "Trainer"
}
context_queries {
type {
name: "pipeline"
}
name {
field_value {
string_value: "TFRS-ranking"
}
}
}
context_queries {
type {
name: "pipeline_run"
}
name {
field_value {
string_value: "2022-12-14T13:03:48.504573"
}
}
}
context_queries {
type {
name: "node"
}
name {
field_value {
string_value: "TFRS-ranking.Trainer"
}
}
}
artifact_query {
type {
name: "Model"
base_type: MODEL
}
}
output_key: "model"
}
}
}
}
outputs {
outputs {
key: "pushed_model"
value {
artifact_spec {
type {
name: "PushedModel"
base_type: MODEL
}
}
}
}
}
parameters {
parameters {
key: "custom_config"
value {
field_value {
string_value: "null"
}
}
}
parameters {
key: "push_destination"
value {
field_value {
string_value: "{\n \"filesystem\": {\n \"base_directory\": \"serving_model/TFRS-ranking\"\n }\n}"
}
}
}
}
upstream_nodes: "Trainer"
execution_options {
caching_options {
}
}
, pipeline_info=id: "TFRS-ranking"
, pipeline_run_id='2022-12-14T13:03:48.504573')
WARNING:absl:Pusher is going to push the model without validation. Consider using Evaluator or InfraValidator in your pipeline.
INFO:absl:Model version: 1671023059
INFO:absl:Model written to serving path serving_model/TFRS-ranking/1671023059.
INFO:absl:Model pushed to pipelines/TFRS-ranking/Pusher/pushed_model/3.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 3 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'pushed_model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Pusher/pushed_model/3"
, artifact_type: name: "PushedModel"
base_type: MODEL
)]}) for execution 3
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component Pusher is finished.
You should see “INFO:absl:Component Pusher is finished.” at the end of the logs if the pipeline finished successfully. Because Pusher component is the last component of the pipeline.
The pusher component pushes the trained model to the SERVING_MODEL_DIR which is the serving_model/TFRS-ranking directory if you did not change the variables in the previous steps. You can see the result from the file browser in the left-side panel in Colab, or using the following command:
# List files in created model directory.ls -R {SERVING_MODEL_DIR}
serving_model/TFRS-ranking: 1671023059 serving_model/TFRS-ranking/1671023059: assets keras_metadata.pb saved_model.pb variables serving_model/TFRS-ranking/1671023059/assets: serving_model/TFRS-ranking/1671023059/variables: variables.data-00000-of-00001 variables.index
Now we can test the ranking model by computing predictions for a user and a movie:
import glob
# Load the latest model for testing
loaded = tf.saved_model.load(max(glob.glob(os.path.join(SERVING_MODEL_DIR, '*/')), key=os.path.getmtime))
print(loaded({'userId': [[42]], 'movieId': [[15]]}).numpy())
You need to login in order to like this post: click here
YOU MIGHT ALSO LIKE