For high-throughput, real-time models (e.g models with an HTTP endpoint such as POST /predict and billions of predictions per day), you can stream predictions to Kafka or other message brokers, and then have a separate process to store them in a persistent storage.
Using a message broker such as Kafka lets you store predictions of real-time models with low latency.
Don't have billions of predictions?
If you are not dealing with billions of predictions per day, you should consider a simpler solution.
Writing messages to a Kafka queue is very simple in Python and other languages. Here are examples for Flask and FastAPI, which are commonly used to serve ML models.
Flask
With Flask, you can use the kafka-python library. Example:
Now, you can stream predictions from Kafka to a persistent storage such as S3. There are different ways to achieve this - we'll cover here Kafka Connect and Spark Streaming.
Spark Streaming
Spark Streaming is an extension of the core Spark API that allows you to process real-time data from various sources including Kafka. This processed data can be pushed out to file systems and databases.
In this example, we will process messages from the my-model topic and store them in a Delta lake table:
Kafka Connect makes it easy to quickly define connectors to move data between Kafka and other data systems, such as S3, Elasticsearch, and others.
As a prerequisite to Kafka Connect, you'll need Schema Registry, which is a tool to manage schemas for Kafka topics.
Here is an example of a connector to stream messages from the my-model topic to Parquet file on S3:
PUT /connectors/my-model-connector/config{"connector.class":"io.confluent.connect.s3.S3SinkConnector","storage.class":"io.confluent.connect.s3.storage.S3Storage","s3.region":"us-east-1","s3.bucket.name":"myorg-models","topics.dir":"my-model/serving","flush.size":"2","rotate.schedule.interval.ms":"20000","auto.register.schemas":"false","tasks.max":"1","s3.part.size":"5242880","timezone":"UTC","parquet.codec":"snappy","topics":"my-model","s3.credentials.provider.class":"com.amazonaws.auth.DefaultAWSCredentialsProviderChain","format.class":"parquet","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.storage.StringConverter","schema.registry.url":"http://my-schema-registry","value.converter.schema.registry.url":"http://my-schema-registry"}