Real-time Models (Kafka)
For high-throughput, real-time models (e.g models with an HTTP endpoint such as
POST /predict
and billions of predictions per day), you can stream predictions to Kafka or other message brokers, and then have a separate process to store them in a persistent storage.Using a message broker such as Kafka lets you store predictions of real-time models with low latency.
Don't have billions of predictions?
If you are not dealing with billions of predictions per day, you should consider a simpler solution.
You can deploy Kafka in various ways:
Writing messages to a Kafka queue is very simple in Python and other languages. Here are examples for Flask and FastAPI, which are commonly used to serve ML models.
producer = KafkaProducer(bootstrap_servers="kafka-cp-kafka:9092")
@app.route("/predict", methods=["POST"])
def predict():
...
producer.send("my-model", json.dumps({
"id": str(uuid.uuid4()),
"model_name": "my-model",
"model_version": "v1",
"inputs": {
"age": 38,
"previously_insured": True,
},
"outputs": {
"will_buy_insurance": True,
"confidence": 0.98,
},
}).encode("ascii"))
aioproducer = None
@app.on_event("startup")
async def startup_event():
global aioproducer
aioproducer = AIOKafkaProducer(bootstrap_servers="my-kafka:9092")
await aioproducer.start()
@app.on_event("shutdown")
async def shutdown_event():
await aioproducer.stop()
Then, whenever you have a new prediction you can publish it to a Kafka topic:
@app.post("/predict")
async def predict(request: PredictRequest):
...
await aioproducer.send("my-model", json.dumps({
"id": str(uuid.uuid4()),
"model_name": "my-model",
"model_version": "v1",
"inputs": {
"age": 38,
"previously_insured": True,
},
"outputs": {
"will_buy_insurance": True,
"confidence": 0.98,
},
}).encode("ascii"))
Now, you can stream predictions from Kafka to a persistent storage such as S3. There are different ways to achieve this - we'll cover here Kafka Connect and Spark Streaming.
Spark Streaming is an extension of the core Spark API that allows you to process real-time data from various sources including Kafka. This processed data can be pushed out to file systems and databases.
In this example, we will process messages from the
my-model
topic and store them in a Delta lake table:# Create stream with Kafka source
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "my-kafka:9092") \
.option("subscribe", "my-model") \
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.load()
# Parse JSON from Kafka
schema = StructType() \
.add("sepal_length", FloatType()) \
.add("sepal_width", FloatType()) \
.add("petal_length", FloatType()) \
.add("petal_width", FloatType()) \
.add("prediction", IntegerType()) \
.add("confidence", FloatType())
df = df.withColumn("json", F.from_json(F.col("value").cast("string"), schema))
df = df.select(F.col("json.*"))
# Write to Delta Lake
df.writeStream \
.format("delta") \
.outputMode("append") \
.option("mergeSchema", "true") \
.option("checkpointLocation", f"{S3_BASE_URL}/my-model/serving/_checkpoints/kafka") \
.start(f"{S3_BASE_URL}/my-model/serving") \
.awaitTermination()
Kafka Connect makes it easy to quickly define connectors to move data between Kafka and other data systems, such as S3, Elasticsearch, and others.
As a prerequisite to Kafka Connect, you'll need Schema Registry, which is a tool to manage schemas for Kafka topics.
Here is an example of a connector to stream messages from the
my-model
topic to Parquet file on S3:PUT /connectors/my-model-connector/config
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.region": "us-east-1",
"s3.bucket.name": "myorg-models",
"topics.dir": "my-model/serving",
"flush.size": "2",
"rotate.schedule.interval.ms": "20000",
"auto.register.schemas": "false",
"tasks.max": "1",
"s3.part.size": "5242880",
"timezone": "UTC",
"parquet.codec": "snappy",
"topics": "my-model",
"s3.credentials.provider.class": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
"format.class": "parquet",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"schema.registry.url": "http://my-schema-registry",
"value.converter.schema.registry.url": "http://my-schema-registry"
}