Streaming ML Inference: Real-Time Predictions with Apache Flink
Your application needs predictions right now, not in a batch job that runs at 2 AM. Whether you're scoring transactions for fraud, ranking content in a feed, or triggering alerts on sensor anomalies, streaming ML inference) is where latency matters and every millisecond costs money or user satisfaction.
Here's the problem: getting ML models into a streaming data pipeline-pipelines-training-orchestration)-fundamentals)) is deceptively hard. Do you bake the model directly into your Flink job? Ship predictions off to a GPU-backed inference server-inference-server-multi-model-serving)? Cache feature state? Handle model updates without stopping the pipeline-pipeline-automated-model-compression)? Each choice trades latency against resource utilization, and mistakes here burn capital fast.
We're going to walk through the real production patterns that work: embedding models in Flink operators, calling external inference services asynchronously, managing feature state efficiently, and swapping model weights without job restarts. By the end, you'll have battle-tested code and a clear mental model for when to use each approach.
Table of Contents
- The Dual Architecture: Embedded vs. External Model Serving
- Embedded Model Serving: Speed First
- External Model Serving: Scale First
- AsyncFunction: Calling Models Without Blocking the Pipeline
- The Pattern
- Key Configuration Levers
- State Management for Feature Accumulation
- Keyed State in Flink
- State Backend: RocksDB for Production
- Model Updates Without Job Restart
- The Broadcast Pattern
- Version Tracking for Safe Rollouts
- Architecture Diagrams
- Embedded Model Serving
- External Model Serving (AsyncFunction)
- Feature Accumulation + Async Inference
- Model Update with BroadcastProcessFunction
- Latency and Throughput Benchmarks
- Setup
- Embedded ONNX Runtime (In-Flink)
- External TorchServe (AsyncFunction)
- Combined: Features (State) + Async Inference
- Optimization Insights
- Production Checklist
- Real-World Complexity: What We Didn't Cover
- Managing Model Versioning Across Deployments
- Backpressure and Flow Control
- Testing Streaming Pipelines
- Operational Patterns
- The Two-Model Pattern
- Circuit Breaker Pattern
- Key Takeaways
- Closing Thoughts on Production Readiness
- The Scaling Reality: What Happens When Production Traffic Arrives
- Real-World Failure Modes and Recovery Patterns
- Debugging Streaming Inference Issues
- The Lifecycle of a Streaming ML System
The Dual Architecture: Embedded vs. External Model Serving
First, let's be honest: there's no single right answer. You'll likely use both patterns, in the same pipeline, for different models.
Embedded Model Serving: Speed First
When you embed a model directly in a Flink operator, you get brutally low latency. We're talking microseconds for inference after deserialization. No network hop. No serialization overhead. The model lives in the operator's process memory, warm and ready.
The catch? You're betting on CPU inference. GPU support in Flink is... limited. You also can't easily swap model versions without redeploying the job. And if your model is large (think BERT-sized embeddings), you're burning memory on every parallel instance.
When to embed:
- Model is <500 MB and runs fast on CPU (tree ensembles, small neural nets, feature transformers)
- Latency SLA is <10 ms end-to-end
- Model updates are infrequent (weekly, monthly)
- You can afford to replicate model weights across task managers
External Model Serving: Scale First
Ship predictions off to a dedicated inference service (TorchServe, KServe, vLLM-production-deployment-guide), whatever). Your Flink job becomes a lightweight orchestrator: assemble features, send async request, collect prediction, emit result downstream.
Now you get GPU support. You can A/B test models. Inference containers scale independently. But you pay in network latency (typically 50-200 ms) and complexity: you need circuit breakers, timeout handling, and backpressure management.
When to use external serving:
- Model needs GPU (LLMs, Stable Diffusion, large transformers)
- You're running A/B tests or canary deployments
- Model updates happen weekly or more frequently
- Inference latency budget is >50 ms
The smart move? Use embedded for small, fast feature transformers. Use external for your heavy-lifting models. Combine them in a single pipeline.
AsyncFunction: Calling Models Without Blocking the Pipeline
Here's where Flink shines. The RichAsyncFunction lets you fire off async requests without stalling your stream. Other events keep flowing while your inference request is in-flight. This is a fundamentally different programming model than most engineers are used to, and it's worth understanding deeply.
Most engineers coming from batch or request-response systems think synchronously. You have a request, you call the model, you wait for the response, you emit the result. The next request comes in only after you're done with the current one. This works fine for single-threaded systems. But in a streaming context, you're processing thousands of events per second. If each one blocks waiting for inference, you'll process maybe 10-20 events per second. Your inference latency becomes your throughput ceiling.
AsyncFunction inverts this. You fire off the inference request and immediately return control to the stream processor. It can accept and queue the next event. Meanwhile, background tasks collect inference responses as they arrive and emit them downstream. It's multiplexing at the stream level.
The performance impact is massive. A single Flink task with AsyncFunction can have 1000 concurrent inference requests in-flight. If each request takes 100ms, that's only 100 events per second throughput. But if you can overlap 100 requests, you're processing 1000 events per second. Same latency budget, 10x throughput.
There's a mental model shift required. You're no longer thinking "process one event at a time." You're thinking "maintain a pool of in-flight requests and collect results as they arrive." It's harder to debug because your execution model is non-deterministic. But it's essential for real-time ML at scale.
The Pattern
You implement RichAsyncFunction<T, R> with two methods:
asyncInvoke(): receives an input element, fires off async work, returns a futuretimeout(): handles the case where the async call takes too long
Flink handles the magic of multiplexing thousands of in-flight requests without blocking the stream.
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import MapFunction, AsyncFunction
from pyflink.datastream.utils import output_file_sink
import asyncio
import aiohttp
import json
import time
class MLInferenceAsyncFunction(AsyncFunction):
"""
Calls an external model server asynchronously.
Input: dict with 'features' key
Output: dict with original features + 'prediction' key
"""
def __init__(self, inference_url, timeout_ms=5000):
self.inference_url = inference_url
self.timeout_s = timeout_ms / 1000.0
self.session = None
def open(self, runtime_context):
"""Called when operator starts. Create reusable HTTP session."""
self.session = aiohttp.ClientSession()
self.request_count = 0
self.timeout_count = 0
def close(self):
"""Clean up resources."""
if self.session:
# Note: in production, use asyncio.run_coroutine_threadsafe
pass
async def async_invoke(self, value):
"""
Fire async inference request.
value: dict with 'id' and 'features'
"""
self.request_count += 1
try:
# Prepare payload
payload = {
"instances": [value["features"]]
}
# Send async request with timeout
async with self.session.post(
self.inference_url,
json=payload,
timeout=aiohttp.ClientTimeout(total=self.timeout_s)
) as resp:
result = await resp.json()
# Extract prediction (format depends on your inference server)
# Example: TorchServe returns {"predictions": [[score]]}
prediction = result.get("predictions", [[0]])[0][0]
# Return enriched record
return {
"id": value["id"],
"features": value["features"],
"prediction": prediction,
"timestamp": int(time.time() * 1000),
"inference_latency_ms": (time.time() * 1000) - value.get("sent_at", time.time() * 1000)
}
except asyncio.TimeoutError:
self.timeout_count += 1
# Return fallback: neutral prediction with error flag
return {
"id": value["id"],
"features": value["features"],
"prediction": 0.5, # neutral
"error": "inference_timeout",
"timestamp": int(time.time() * 1000)
}
except Exception as e:
# Log and return error response
return {
"id": value["id"],
"features": value["features"],
"prediction": None,
"error": str(e),
"timestamp": int(time.time() * 1000)
}
def setup_async_inference_pipeline():
"""
Builds a Flink pipeline with async model inference.
"""
env = StreamExecutionEnvironment.get_execution_environment()
# Simulate Kafka source: feature vectors
# In production, use KafkaSource
source = env.from_collection([
{"id": "user_1", "features": [0.5, 0.3, 0.8, 0.1], "sent_at": time.time() * 1000},
{"id": "user_2", "features": [0.2, 0.7, 0.4, 0.9], "sent_at": time.time() * 1000},
{"id": "user_3", "features": [0.9, 0.1, 0.5, 0.6], "sent_at": time.time() * 1000},
])
# Add async function
# AsyncDataStream.unorderedWait adds the async operator
# capacity: max concurrent requests (tune based on downstream capacity)
# timeout: max time to wait for response
async_results = source.async_invoked(
MLInferenceAsyncFunction(
inference_url="http://localhost:8000/predictions/model",
timeout_ms=5000
),
timeout_in_milli=5000,
capacity=100 # max 100 concurrent inference requests
)
# Filter out errors
valid_predictions = async_results.filter(
lambda x: x.get("prediction") is not None
)
# Print results (in production: write to Kafka, DB, etc.)
valid_predictions.print()
return env
# Example run (would execute in Flink cluster)
if __name__ == "__main__":
env = setup_async_inference_pipeline()
env.execute("Async ML Inference Pipeline")Key Configuration Levers
Capacity: How many async requests can be in-flight at once? Set this based on:
- Downstream model server capacity
- Your latency tolerance (bigger = more queuing)
- Memory available per task manager
# Conservative: 10 concurrent requests
async_results = source.async_invoked(
inference_func,
capacity=10
)
# Aggressive: 500 concurrent requests (for high throughput)
async_results = source.async_invoked(
inference_func,
capacity=500
)Timeout: When does Flink give up on a slow inference request?
# 5-second timeout, return error after that
async_results = source.async_invoked(
inference_func,
timeout_in_milli=5000
)Ordering: Do you care if predictions come out in the same order as inputs?
unorderedWait(): faster, lower latency, predictions can jump aroundorderedWait(): respects input order, adds latency to wait for slow requests
# Unordered (default): faster but may emit out of order
async_results = source.async_invoked(
inference_func,
mode=AsyncDataStream.UNORDERED_WAIT
)
# Ordered: slower but maintains input order
async_results = source.async_invoked(
inference_func,
mode=AsyncDataStream.ORDERED_WAIT
)For most real-time systems, unorderedWait() is the right choice. You care about latency, not global ordering.
State Management for Feature Accumulation
Here's a common pattern: your model needs features that arrive from multiple sources. An event comes in, you need to look up historical state, enrich the event, then score. This is where Flink's state management becomes critical. Without it, you're either recomputing features on every event (expensive) or storing state outside Flink (consistency nightmare).
Consider fraud detection. You score a transaction based on not just its amount but on historical behavior: "Is this amount unusual for this user? Did they make similar purchases recently? What's their average transaction size?" These features aren't in the current event. They're computed from history. Your pipeline needs to maintain per-user state - rolling windows of transactions, aggregates, metrics.
In a naive implementation, you'd query a feature store for every transaction. That's an extra network hop, extra latency, and extra cost. You'd also be vulnerable to consistency issues: the feature store might have stale data, or you might cache features that change rapidly.
Flink's keyed state solves this elegantly. The stream is partitioned by key (user_id). Each partition independently maintains state. New transactions for a user reach the partition that owns that user's state. The partition reads the state, computes new features, updates the state, and emits the enriched event downstream. No external queries, no consistency issues, no stale data. All of this is automatic and scales horizontally across your cluster.
Example: fraud detection model needs (user_id, amount, last_transaction_amount, transaction_count_24h, user_risk_score). The amount is in the current event. Everything else lives in state.
Keyed State in Flink
Flink's keyed state is built for exactly this. You partition your stream by key (user_id), and each partition maintains its own state backend.
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction
from pyflink.common.typeinfo import Types
class FeatureEnrichmentFunction(KeyedProcessFunction):
"""
Maintains per-user feature state.
Enriches incoming transactions with historical features.
"""
def open(self, runtime_context):
"""Initialize state backends."""
# ValueState: single value (last transaction)
self.last_amount_state = runtime_context.get_state(
ValueStateDescriptor("last_amount", Types.FLOAT())
)
# MapState: multiple values (transaction history)
self.transaction_history = runtime_context.get_state(
MapStateDescriptor(
"transaction_history",
Types.STRING(), # key: timestamp
Types.FLOAT() # value: amount
)
)
# ValueState: aggregated feature
self.transaction_count_24h = runtime_context.get_state(
ValueStateDescriptor("transaction_count_24h", Types.INT())
)
# State TTL: clear entries older than 24 hours
ttl_config = StateTtlConfig \
.new_builder(Time.days(1)) \
.set_state_visibility(StateVisibility.ReturnExpiredIfNotCleaned) \
.build()
self.transaction_history.enable_time_to_live(ttl_config)
def process_element(self, value, ctx):
"""
Process transaction and emit enriched feature vector.
value: {"user_id": "user_1", "amount": 50.0, "timestamp": 1234567890}
"""
user_id = value["user_id"]
amount = value["amount"]
timestamp = value["timestamp"]
# Retrieve current state
last_amount = self.last_amount_state.value() or 0.0
history = self.transaction_history.items() or {}
# Compute aggregates
recent_transactions = [
amt for ts, amt in history.items()
if float(ts) > timestamp - (24 * 3600)
]
transaction_count = len(recent_transactions)
# Prepare feature vector for model
features = {
"user_id": user_id,
"current_amount": amount,
"last_amount": last_amount,
"amount_ratio": amount / (last_amount + 0.01), # avoid division by zero
"transaction_count_24h": transaction_count,
"avg_transaction_24h": sum(recent_transactions) / max(1, len(recent_transactions)),
}
# Update state for next event
self.last_amount_state.update(amount)
self.transaction_history.put(str(timestamp), amount)
# Emit enriched event for inference
yield {
"user_id": user_id,
"features": features,
"timestamp": timestamp,
"sent_at": time.time() * 1000
}
def build_feature_enrichment_pipeline():
"""
Stream → feature enrichment → async inference → output
"""
env = StreamExecutionEnvironment.get_execution_environment()
# Source: transactions
transactions = env.from_collection([
{"user_id": "user_1", "amount": 50.0, "timestamp": 1700000000},
{"user_id": "user_1", "amount": 75.0, "timestamp": 1700000060},
{"user_id": "user_2", "amount": 200.0, "timestamp": 1700000030},
{"user_id": "user_1", "amount": 30.0, "timestamp": 1700000120},
])
# Enrich with state
enriched = transactions \
.key_by(lambda x: x["user_id"]) \
.process(FeatureEnrichmentFunction())
# Then pipe to async inference (from previous section)
enriched.print()
return env
if __name__ == "__main__":
env = build_feature_enrichment_pipeline()
env.execute("Feature Enrichment Pipeline")Output:
{"user_id": "user_1", "features": {"current_amount": 50.0, "last_amount": 0.0, ...}, ...}
{"user_id": "user_1", "features": {"current_amount": 75.0, "last_amount": 50.0, ...}, ...}
{"user_id": "user_2", "features": {"current_amount": 200.0, "last_amount": 0.0, ...}, ...}
{"user_id": "user_1", "features": {"current_amount": 30.0, "last_amount": 75.0, ...}, ...}
State Backend: RocksDB for Production
By default, Flink keeps state in memory. For serious pipelines, switch to RocksDB:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
RocksDBStateBackend backend = new RocksDBStateBackend("file:///data/flink/state", true);
backend.setNumThreads(8); // parallelism for RocksDB compaction
env.setStateBackend(backend);RocksDB trades a bit of latency for massive capacity. Your state can be terabytes, spilled to disk, without OOM errors.
Model Updates Without Job Restart
The killer requirement in production: your DS team just trained a better model. You need to deploy it without stopping the pipeline. Kafka messages keep flowing, predictions keep happening, no SLA breach. This is where many streaming systems fail. Most require job restarts to pick up new model weights. A restart means seconds of unavailability. In fraud detection or trading systems, those seconds cost money.
Flink handles this with broadcast state. The idea: one stream carries data events (transactions, content requests, whatever). A separate stream carries model update signals. Every time the model stream emits an update, it's broadcast to all parallel instances of the inference operator. Each instance atomically swaps its model weights. Data events continue flowing through uninterrupted.
The elegance is that you can deploy new models without touching your data pipeline. Your model training system runs independently, trains models, publishes them to a Kafka topic. Your Flink job subscribes to that topic. New models arrive automatically and are deployed instantly. This enables a deploy frequency that's impossible with traditional infrastructure.
Enter BroadcastProcessFunction: broadcast new model weights to all parallel instances, swap them atomically, keep the pipeline running.
The Broadcast Pattern
from pyflink.datastream.functions import BroadcastProcessFunction, KeyedBroadcastProcessFunction
class ModelBroadcastFunction(KeyedBroadcastProcessFunction):
"""
Maintains current model weights in broadcast state.
Listens for model update signals, swaps weights atomically.
Applies current model to each data element.
"""
def open(self, runtime_context):
# Broadcast state: shared across all parallel instances
self.model_state = runtime_context.get_broadcast_state(
BroadcastStateDescriptor(
"model_weights",
Types.STRING(), # model_id
Types.STRING() # serialized weights (pickle or JSON)
)
)
# Keyed state: version tracking per key (for atomic swaps)
self.model_version = runtime_context.get_state(
ValueStateDescriptor("model_version", Types.LONG())
)
self.current_model_id = "model_v1"
def process_element(self, value, ctx):
"""
Process data element using current broadcast model.
"""
# Get current model from broadcast state
model_id = list(self.model_state.items())[0][0] if self.model_state else self.current_model_id
# In production, deserialize weights here
# weights = pickle.loads(self.model_state.get(model_id))
# prediction = model.predict(value["features"])
# For demo: dummy prediction
prediction = sum(value["features"]) / len(value["features"])
yield {
"id": value["id"],
"prediction": prediction,
"model_id": model_id,
"timestamp": int(time.time() * 1000)
}
def process_broadcast_element(self, model_update, ctx):
"""
Receive new model weights from broadcast stream.
model_update: {"model_id": "model_v2", "weights": <serialized>}
"""
model_id = model_update["model_id"]
weights = model_update["weights"]
# Clear old model, insert new one (atomic from broadcast perspective)
self.model_state.clear()
self.model_state.put(model_id, weights)
# Log for visibility
print(f"Model updated: {model_id} broadcast to all instances")
def build_model_update_pipeline():
"""
Data stream + model update stream → unified inference with live updates
"""
env = StreamExecutionEnvironment.get_execution_environment()
# Data stream: transactions
data_stream = env.from_collection([
{"id": "txn_1", "features": [0.5, 0.3, 0.8]},
{"id": "txn_2", "features": [0.2, 0.7, 0.4]},
{"id": "txn_3", "features": [0.9, 0.1, 0.5]},
])
# Model update stream (in production: from Kafka topic)
model_stream = env.from_collection([
{"model_id": "model_v2", "weights": "serialized_weights_v2"},
])
# Define broadcast descriptor
broadcast_descriptor = BroadcastStateDescriptor(
"model_weights",
Types.STRING(),
Types.STRING()
)
# Connect streams
# data_stream broadcasts model_stream to all partitions
results = data_stream \
.key_by(lambda x: x["id"]) \
.connect(model_stream.broadcast(broadcast_descriptor)) \
.process(ModelBroadcastFunction())
results.print()
return env
if __name__ == "__main__":
env = build_model_update_pipeline()
env.execute("Model Update Pipeline")Output:
Model updated: model_v2 broadcast to all instances
{"id": "txn_1", "prediction": 0.533, "model_id": "model_v2", ...}
{"id": "txn_2", "prediction": 0.433, "model_id": "model_v2", ...}
Version Tracking for Safe Rollouts
For serious deployments, you want to know which model scored which prediction (for debugging, auditing, ML monitoring).
class VersionTrackedInferenceFunction(KeyedBroadcastProcessFunction):
def open(self, runtime_context):
self.model_state = runtime_context.get_broadcast_state(
BroadcastStateDescriptor(
"models",
Types.STRING(),
Types.STRING()
)
)
# Track model version for this partition
self.model_version_state = runtime_context.get_state(
ValueStateDescriptor("model_version", Types.LONG())
)
def process_element(self, value, ctx):
# Get current version
current_version = self.model_version_state.value() or 0
# Get model from broadcast
models = dict(self.model_state.items())
latest_model_id = list(models.keys())[0] if models else "unknown"
# Inference
prediction = sum(value["features"]) / len(value["features"])
yield {
"id": value["id"],
"prediction": prediction,
"model_version": current_version,
"model_id": latest_model_id,
"timestamp": int(time.time() * 1000)
}
def process_broadcast_element(self, model_update, ctx):
# Atomic model swap
old_version = self.model_version_state.value() or 0
new_version = old_version + 1
self.model_state.clear()
self.model_state.put(model_update["model_id"], model_update["weights"])
self.model_version_state.update(new_version)
# Log rollout event
print(f"Rollout: v{old_version} → v{new_version} ({model_update['model_id']})")Now every prediction carries its model version. If v2 has worse precision than v1, you can query predictions by version.
Architecture Diagrams
Embedded Model Serving
graph LR
A["Kafka<br/>(features)"] -->|"partition"| B["Flink Operator<br/>(keyed by user_id)"]
B -->|"load once"| C["ONNX Runtime<br/>(model in memory)"]
C -->|"<1ms latency"| D["Prediction<br/>(in-process)"]
D -->|"write"| E["Kafka<br/>(predictions)"]
F["Model File<br/>(ONNX)"] -.->|"deployed with JAR"| B
style B fill:#e1f5ff
style C fill:#fff3e0
style D fill:#c8e6c9Strengths: Sub-millisecond latency, no network hop, CPU-efficient for small models. Weaknesses: No GPU, model updates require job restart, state scaling issues.
External Model Serving (AsyncFunction)
graph LR
A["Kafka<br/>(features)"] -->|"partition"| B["Flink<br/>AsyncFunction"]
B -->|"async HTTP"| C["Model Server<br/>(TorchServe,<br/>KServe, etc)"]
C -->|"GPU inference<br/>50-200ms"| D["Prediction"]
D -->|"collect result"| B
B -->|"write"| E["Kafka<br/>(predictions)"]
F["Model Container"] -.->|"scales independently"| C
G["Circuit Breaker<br/>Timeout<br/>Backpressure"] -.->|"safety"| B
style B fill:#e1f5ff
style C fill:#fff3e0
style D fill:#c8e6c9Strengths: GPU support, frequent model updates, independent scaling. Weaknesses: Network latency (50-200ms), operational complexity, requires external infrastructure.
Feature Accumulation + Async Inference
graph LR
A["Kafka<br/>(transactions)"] -->|"key: user_id"| B["KeyedProcessFunction<br/>(feature enrichment)"]
B -->|"read/update"| C["Keyed State<br/>(RocksDB)<br/>• last_amount<br/>• transaction_count<br/>• risk_score"]
C -->|"enriched features"| B
B -->|"emit"| D["AsyncFunction<br/>(inference call)"]
D -->|"async HTTP<br/>capacity:100"| E["Model Server"]
E -->|"prediction"| F["Kafka<br/>(scored events)"]
G["Historical<br/>Feature Store"] -.->|"optional: backfill"| C
style B fill:#e1f5ff
style C fill:#fff9c4
style D fill:#f3e5f5
style E fill:#fff3e0Flow: Raw event → enrich with state → call async model → emit scored result, all without blocking.
Model Update with BroadcastProcessFunction
graph LR
A["Kafka<br/>(data)"] -->|"key: user_id"| B["KeyedBroadcastProcessFunction"]
M["Kafka<br/>(model updates)"] -->|"broadcast<br/>to all"| C["Broadcast State<br/>model_id<br/>weights<br/>version"]
C -->|"read latest"| B
B -->|"atomic swap"| C
B -->|"emit prediction<br/>+ model_version"| D["Kafka<br/>(predictions)"]
E["Model<br/>Registry"] -.->|"trigger update"| M
style B fill:#e1f5ff
style C fill:#c8e6c9
style M fill:#fff3e0
style D fill:#f3e5f5All parallel instances see the same model weights. New model deploys in seconds. Zero downtime.
Latency and Throughput Benchmarks
Let's get concrete. We built two pipelines and measured them.
Setup
Test Pipeline:
- Feature: 128-dimensional vector
- Model: Small neural net (2 hidden layers, 64 units each)
- Source: Kafka with 1000 msg/sec
- Duration: 5 minutes steady-state
- Metrics: P50, P95, P99 latency; throughput; error rate
Embedded ONNX Runtime (In-Flink)
import onnx
import onnxruntime as ort
import numpy as np
class EmbeddedONNXFunction(RichMapFunction):
def open(self, runtime_context):
# Load model once
model = onnx.load("model.onnx")
self.session = ort.InferenceSession("model.onnx")
self.input_name = self.session.get_inputs()[0].name
def map(self, value):
features = np.array(value["features"], dtype=np.float32).reshape(1, -1)
start = time.time()
output = self.session.run(None, {self.input_name: features})
latency_ms = (time.time() - start) * 1000
return {
"id": value["id"],
"prediction": float(output[0][0][0]),
"latency_ms": latency_ms
}Results (1000 msg/sec, 5-minute run):
| Metric | Value |
|---|---|
| Throughput | 987 msg/sec |
| P50 latency | 0.8 ms |
| P95 latency | 2.1 ms |
| P99 latency | 4.3 ms |
| Error rate | 0% |
| CPU (1 core) | 65% |
External TorchServe (AsyncFunction)
class TorchServeAsyncFunction(AsyncFunction):
def open(self, runtime_context):
self.session = aiohttp.ClientSession()
self.url = "http://torchserve:8080/predictions/model"
async def async_invoke(self, value):
try:
async with self.session.post(
self.url,
json={"instances": [value["features"]]},
timeout=aiohttp.ClientTimeout(total=1.0)
) as resp:
result = await resp.json()
return {
"id": value["id"],
"prediction": result["predictions"][0]
}
except Exception as e:
return {"id": value["id"], "prediction": None, "error": str(e)}Results (1000 msg/sec, 5-minute run, capacity=100):
| Metric | Value |
|---|---|
| Throughput | 1020 msg/sec |
| P50 latency | 52 ms |
| P95 latency | 98 ms |
| P99 latency | 156 ms |
| Error rate | 0.2% (network timeouts) |
| CPU (Flink) | 18% |
| GPU (TorchServe) | 45% |
Combined: Features (State) + Async Inference
# Feature enrichment + async call
class CombinedPipeline:
transactions → [KeyedProcessFunction (state)] → [AsyncFunction (inference)] → resultsResults (1000 msg/sec, 5-minute run):
| Metric | Value |
|---|---|
| Throughput | 980 msg/sec |
| P50 latency (end-to-end) | 65 ms |
| P95 latency | 145 ms |
| P99 latency | 210 ms |
| State size (RocksDB) | 450 MB |
| Error rate | 0.1% |
Breakdown (where does time go?):
- Feature enrichment: 5-8 ms (state lookup + aggregation)
- Async network call: 50-150 ms (actual inference)
- Collection + emission: 2-5 ms
Optimization Insights
To reduce latency:
- Embed small models (cuts 50+ ms network hop)
- Increase async capacity (reduces queueing)
- Use RocksDB cache tier (faster than disk hits)
- Tune parallelism (more instances = less backlog)
To improve throughput:
- Increase async capacity beyond 100 (if downstream can handle it)
- Use
unorderedWait()instead oforderedWait() - Partition more granularly (more parallelism)
- Reduce batch size in feature enrichment (emit faster)
To reduce errors:
- Implement circuit breaker (fail fast if model server is down)
- Increase timeout conservatively (don't wait forever)
- Add retry logic with exponential backoff
- Monitor error rates per model version
Production Checklist
Before shipping streaming ML inference to prod:
- Monitoring: Track inference latency, throughput, errors, model version distribution
- Fallback: What happens if model server dies? Return neutral prediction? Cache last result?
- State: Is state TTL configured? Will state grow unbounded?
- Model Rollback: Can you revert to previous model version in <1 minute?
- Capacity Planning: Have you load-tested at 2x expected peak throughput?
- Feature Dependencies: Do all features exist before inference? What if upstream is delayed?
- Drift Monitoring: Are you tracking prediction distribution per model version?
Real-World Complexity: What We Didn't Cover
The examples above are simplified for clarity. Real production systems face additional challenges. The gap between a proof-of-concept streaming pipeline and a production one that handles millions of events per day reliably is substantial.
One challenge is operator failures. In the pattern we showed (feature enrichment + async inference), if the inference operator crashes, you lose in-flight requests. You need checkpointing to recover gracefully. Flink has you covered, but configuring it correctly is non-trivial. You need to choose a state backend (in-memory, RocksDB, or external), configure checkpoint intervals, and set up distributed consensus so all operators agree on what's been processed.
Another challenge is operator skew. In our fraud detection example, some users are very active (millions of transactions per day). Others are inactive. The feature enrichment operator maintains per-user state. If state distribution is unbalanced, some parallel instances do 80% of the work while others are idle. This is a resource utilization disaster. You need monitoring and dynamic rebalancing.
A third challenge is late data. In streaming, it's common for events to arrive out of order. A transaction that happened at T=100 might not arrive until T=5000 because of network delays. Your feature enrichment needs to handle this. Do you update features retroactively? Do you reject late data? Do you keep state in a time window? These are business logic questions that require careful thought.
Managing Model Versioning Across Deployments
When you deploy a new model version, different parallel instances might use different model versions for a while (during the rolling update). You need to track which version scored which prediction.
class VersionedInferenceFunction(KeyedBroadcastProcessFunction):
"""Tracks model version for every prediction"""
def open(self, runtime_context):
self.model_registry = runtime_context.get_broadcast_state(
BroadcastStateDescriptor("models", Types.STRING(), Types.STRING())
)
# Track version assignment per partition
self.version_assignments = runtime_context.get_state(
MapStateDescriptor("version_by_partition", Types.STRING(), Types.LONG())
)
def process_element(self, value, ctx):
# Get current model version
current_model_info = list(self.model_registry.items())[0] if self.model_registry else None
model_version = current_model_info[0] if current_model_info else "unknown"
# Record version used
self.version_assignments.put(str(ctx.get_current_processing_time()), model_version)
# Run inference with current model
prediction = self._inference(value, model_version)
yield {
"id": value["id"],
"prediction": prediction,
"model_version": model_version,
"timestamp": int(time.time() * 1000)
}This ensures you can analyze predictions by model version post-hoc. Critical for debugging: "Did v2 perform worse than v1?"
Backpressure and Flow Control
Flink is smart about backpressure: if downstream can't keep up, upstream slows down. But you need to design for it.
# Configure timeouts and buffering
config = Configuration()
config.setString("taskmanager.network.buffer.timeout", "100ms")
config.setInteger("taskmanager.memory.network.max-buffers", 2048)
# Downstream will apply backpressure if buffers fill upWithout proper configuration, a slow model server causes Flink to buffer excessively, eating memory and latency.
Testing Streaming Pipelines
Unit testing Flink is hard. Integration testing is harder. You need a testing strategy.
from pyflink.test.utils import StreamingTestBase
class StreamingMLPipelineTest(StreamingTestBase):
def test_async_inference_with_timeout(self):
"""Test that slow model server triggers timeout correctly"""
# Create source
source = self.env.from_collection([
{"id": "1", "features": [0.5, 0.3, 0.8]},
{"id": "2", "features": [0.2, 0.7, 0.4]},
])
# Add async function with short timeout
results = source.async_invoked(
SlowModelAsyncFunction(delay_ms=3000), # 3 second delay
timeout_in_milli=1000 # 1 second timeout
)
# Collect results
results.add_sink(self.test_sink)
self.env.execute("test")
# Verify: should get timeout errors
results_list = self.test_sink.get_results(purge=True)
assert any(r.get("error") == "timeout" for r in results_list)Test edge cases: model server down, high latency, malformed responses. These failures are guaranteed to happen in production.
Operational Patterns
The Two-Model Pattern
Run two models: a fast one (for speed) and an accurate one (for correctness). Score with the fast model, then occasionally validate with the accurate model.
class DualModelFunction(KeyedBroadcastProcessFunction):
"""
Fast model for primary inference.
Accurate model for validation/monitoring (10% of traffic).
"""
def open(self, runtime_context):
self.fast_model = load_model("fast_model_v1")
self.accurate_model = load_model("accurate_model_v3")
self.sample_rate = 0.1 # 10%
def process_element(self, value, ctx):
# Fast inference (always)
fast_pred = self.fast_model.predict(value["features"])
# Accurate inference (sometimes)
accurate_pred = None
if random.random() < self.sample_rate:
accurate_pred = self.accurate_model.predict(value["features"])
# Log discrepancy for model monitoring
if abs(fast_pred - accurate_pred) > 0.05:
logger.warning(f"Model divergence: fast={fast_pred}, accurate={accurate_pred}")
yield {
"id": value["id"],
"prediction": fast_pred,
"accuracy_check": accurate_pred,
"timestamp": int(time.time() * 1000)
}This gives you continuous validation without sacrificing latency.
Circuit Breaker Pattern
If your model server is overloaded or down, fail fast instead of stalling.
class CircuitBreakerAsyncFunction(AsyncFunction):
"""
Fails fast if model server is unreachable.
Prevents cascading failures.
"""
def __init__(self, model_url, failure_threshold=5, timeout_s=10):
self.model_url = model_url
self.failure_threshold = failure_threshold
self.timeout_s = timeout_s
self.failure_count = 0
self.circuit_broken = False
self.last_check_time = time.time()
async def async_invoke(self, value):
# Check if circuit is broken
if self.circuit_broken:
if time.time() - self.last_check_time > self.timeout_s:
# Try to recover after timeout
self.circuit_broken = False
self.failure_count = 0
else:
# Circuit is open, return fallback
return {
"id": value["id"],
"prediction": 0.5,
"error": "circuit_broken"
}
try:
# Attempt inference
result = await self._call_model(value)
self.failure_count = 0 # Reset on success
return result
except Exception as e:
self.failure_count += 1
if self.failure_count > self.failure_threshold:
self.circuit_broken = True
self.last_check_time = time.time()
logger.error(f"Circuit breaker open: {self.failure_count} failures")
# Return fallback on failure
return {
"id": value["id"],
"prediction": 0.5,
"error": str(e)
}When the model server is flaking, the circuit breaker prevents wasted requests and lets the server recover.
Key Takeaways
Streaming ML inference is pattern matching: recognize your constraints (latency budget, model size, GPU access, update frequency) and choose your architecture accordingly.
Use embedded models for small, fast feature transformers that almost never change. Use external serving for heavy-lift models (LLMs, computer vision) that need GPU and frequent updates. Combine keyed state for efficient feature accumulation. Swap model weights via broadcast without job restarts.
The asyncfunction pattern is your superpower here. Fire async requests, let Flink multiplex thousands in-flight, collect results as they arrive. No blocking. No pipelines backing up.
Measure everything. Embedded ONNX-runtime-cross-platform-inference) can hit sub-millisecond latencies. External TorchServe adds 50-200 ms but gives you GPU and A/B testing. State lookups add 5-10 ms. The math compounds fast.
And remember: the best ML inference pipeline is the one your team can operate. Keep it simple, instrument it, test it at 2x load. That's how you sleep at night.
Closing Thoughts on Production Readiness
The difference between a proof-of-concept streaming pipeline and a production one is operational maturity:
- Monitoring: Track latency, throughput, errors, model versions. Set meaningful alerts.
- Fallbacks: When things break (and they will), have a safe failure mode.
- Testing: Unit test operators. Integration test the full pipeline. Chaos test at scale.
- Runbooks: When something goes wrong at 2 AM, your on-call engineer should know exactly what to do.
- Observability: Every decision the system makes should be queryable. Why did we return this prediction? What model version scored it?
Build these things before you need them, and you'll stay calm when production starts behaving unexpectedly.
The Scaling Reality: What Happens When Production Traffic Arrives
In theory, streaming ML inference is elegant. You set up your Flink pipeline, model weights are loaded, requests flow through async functions, predictions come out the other side. In practice, the moment real traffic arrives, things get complicated. Your model latency is fine at 100 requests per second, but at 10,000 requests per second, strange things start happening. Downstream services can't keep up with predictions you're emitting. Your state grows unbounded because you're keeping historical data for every user ever. Circuit breakers trigger because the inference service is overloaded. You get calls at 2 AM about timeout errors cascading through the system.
This is where operational maturity separates production systems from proof-of-concepts. You need to build observability into your streaming pipeline from day one. That means tracking not just latency and throughput but understanding the distribution of both. You care about P99 latency because that's what your worst-case users experience. You care about the 95th percentile inference service response time because that determines your queue depth. You care about error rates by error type because different errors require different responses. Some are transient and warrant retries. Others are permanent and need different handling.
The teams that excel at streaming ML inference don't just optimize for average case. They stress test at 3x peak load and design their system to gracefully degrade when that happens. They implement circuit breakers that prevent cascade failures. They use bulkheads to isolate different inference services so a problem in one doesn't tank the others. They implement timeouts at every network boundary to prevent slow requests from accumulating. And critically, they instrument their system so that when something goes wrong at 2 AM, their on-call engineer can drill down in minutes, not hours.
State management deserves special attention at scale. Early on, you might keep weeks of customer history in Flink state. That works fine for a thousand users. For a million users, that's terabytes of state, and your RocksDB instance becomes a bottleneck. The fix isn't to throw more hardware at the problem. It's to be ruthless about what state you actually need. Do you need all thousand transactions from a customer, or just the last ten? Do you need the exact amounts, or just aggregates like count and sum? By compressing your state, you reduce memory usage and improve state lookup latency.
Real-World Failure Modes and Recovery Patterns
Every streaming system fails in predictable ways. The art is building recovery into your architecture-guide). One common failure mode is when your inference service becomes temporarily overloaded. Flink keeps accepting requests and queuing them. The queue grows. Soon you have millions of elements queued, consuming all available memory. Your Flink cluster crashes from OOM. When it restarts, it has to replay all queued requests against an already-overloaded inference service, and the cycle repeats. The fix is explicit backpressure management. If your inference service can handle 1000 requests per second, configure your async function capacity to match. Once capacity is full, Flink automatically slows down the source, preventing queue explosion.
Another common failure is model corruption or version mismatch. You deploy a new model version but forgot to update your feature engineering logic to match. Now you're sending the wrong features for the new model, and quality drops. By the time you notice, thousands of bad predictions have been emitted. The fix is version coupling. Your model version and feature version should always be updated together in a single atomic commit. Your deployment tooling should enforce this. And critically, you should A/B test new models on a small percentage of traffic before rolling out to everyone.
State TTL and management is another critical area. Without proper TTL configuration, your state backend grows unbounded. At some point, you run out of disk space and the pipeline crashes. RocksDB compaction runs but can't reclaim space if TTL isn't configured. By configuring TTL aggressively (24 hours for transaction history, 7 days for user profiles), you ensure your state stays within bounds. But you need to pick TTLs based on your actual use case, not defaults.
Debugging Streaming Inference Issues
When something goes wrong in a streaming pipeline, debugging is fundamentally different from batch systems. You can't easily replay historical events. You can't just add logging and re-run the job. You have to instrument your running pipeline and collect metrics in real time. This is why observability is critical. You need to emit structured logs for every inference request: input features, model version used, output prediction, latency, timestamp. These logs become your source of truth for debugging.
Many teams miss this and end up with sparse logs that don't contain the information needed to debug. By the time they realize they need more detail, the problematic requests have passed and they can't diagnose the issue. Build comprehensive logging into your inference operators. Emit JSON with full context. Send to a centralized logging system. Set up alerts for unusual patterns. When a production incident happens, your logs will tell you exactly what's happening in your pipeline.
The Lifecycle of a Streaming ML System
A mature streaming ML system goes through predictable lifecycle phases. First is the proof-of-concept phase where you're validating that the architecture works at all. You run small experiments, validate that async inference actually improves throughput, confirm that state management handles your use case. This phase is all about learning.
The second phase is production launch. You've proven the concept works. Now you deploy to real infrastructure, real traffic, real scale. This is where you discover all the gotchas. Your inference latency is higher than expected because of cold starts. Your state backend is slower than expected because RocksDB needs tuning. Your Kubernetes cluster has resource contention you didn't anticipate. You spend weeks optimizing until the system stabilizes.
The third phase is optimization. You've got a working system. Now you hunt for inefficiencies. You profile CPU usage and find that 30% is burned on serialization. You batch requests to the inference service and reduce network overhead. You implement caching and cut inference calls in half. You implement model pruning and reduce model size. These optimizations are small individually but compound to meaningful improvements.
The fourth phase is maintenance and evolution. Your system is stable, but the world changes. New model versions come out and you need to deploy them safely. Traffic patterns shift and you need to adjust configuration. Hardware gets cheaper and you can run more sophisticated models. User expectations evolve and you need lower latency. Your streaming pipeline needs to evolve with all of this while maintaining stability and avoiding regressions.