Chapter 18.3: Protobuf Integration
18.3.1 The urban_sim.proto Schema
Protocol Buffers provide the canonical data contract between all system components. The schema defines every message that crosses a process boundary, ensuring type safety between Python physics engines, Java SUMO controllers, and any future components.
syntax = "proto3";
package urban_sim;
// ──────────── Core Data Types ────────────
message VehicleState {
string id = 1;
double x = 2;
double y = 3;
double speed = 4; // m/s
double acceleration = 5; // m/s^2
double angle = 6; // degrees
string edge_id = 7;
string lane_id = 8;
double nox_emission = 9; // mg/s
double pmx_emission = 10; // mg/s
double co2_emission = 11; // mg/s
}
message SimStep {
double time = 1; // simulation time (s)
repeated VehicleState vehicles = 2;
int32 step_number = 3;
}
// ──────────── Canyon Pollution ────────────
message CanyonGeometry {
string edge_id = 1;
double height = 2; // H (m)
double width = 3; // W (m)
double length = 4; // L (m)
double wind_speed = 5; // u_H (m/s)
double wind_direction = 6; // degrees from canyon axis
}
message PollutionRequest {
repeated CanyonGeometry canyons = 1;
repeated VehicleState vehicles = 2;
double background_no2 = 3; // μg/m³
}
message CanyonPollution {
string edge_id = 1;
double c_direct = 2; // μg/m³
double c_recirculation = 3; // μg/m³
double c_total = 4; // μg/m³
double emission_density = 5; // mg/m/s
bool exceeds_who = 6;
}
message PollutionResponse {
repeated CanyonPollution results = 1;
double computation_time_ms = 2;
}
// ──────────── MFG Routing ────────────
message MFGRequest {
repeated EdgeDensity densities = 1;
repeated CanyonPollution pollution = 2;
double lambda_pollution = 3; // pollution cost weight
}
message EdgeDensity {
string edge_id = 1;
double density = 2; // veh/km
double flow = 3; // veh/hr
}
message RouteUpdate {
string vehicle_id = 1;
repeated string edge_ids = 2; // new route
}
message MFGResponse {
repeated RouteUpdate routes = 1;
double nash_gap = 2; // MFG convergence metric
double computation_time_ms = 3;
}
// ──────────── Signal Control ────────────
message SignalRequest {
string tls_id = 1;
repeated QueueLength queues = 2;
repeated CanyonPollution nearby_pollution = 3;
double current_time = 4;
}
message QueueLength {
string lane_id = 1;
int32 queue_count = 2;
double queue_length_m = 3;
}
message SignalResponse {
string tls_id = 1;
int32 phase_index = 2;
double green_time = 3; // seconds
double computation_time_ms = 4;
}
// ──────────── Service Definition ────────────
service UrbanPhysicsService {
// Unary RPCs
rpc ComputePollution(PollutionRequest) returns (PollutionResponse);
rpc ComputeRoutes(MFGRequest) returns (MFGResponse);
rpc ComputeSignal(SignalRequest) returns (SignalResponse);
// Bidirectional streaming (for real-time coupling)
rpc StreamSimulation(stream SimStep) returns (stream PollutionResponse);
}18.3.2 Bidirectional Streaming
The StreamSimulation RPC uses gRPC bidirectional streaming. The SUMO controller streams SimStep messages (vehicle states at each timestep), and the physics server responds with PollutionResponse messages:
SUMO Controller (Java) Physics Server (Python)
│ │
│──── SimStep(t=0, vehicles) ────→ │
│ │── compute OSPM
│ ←── PollutionResponse(t=0) ──── │
│ │
│──── SimStep(t=1, vehicles) ────→ │
│ │── compute OSPM + MFG
│ ←── PollutionResponse(t=1) ──── │
│ ←── MFGResponse(routes) ────── │
│ │
⋮ ⋮Streaming avoids the overhead of establishing a new request for each timestep. The gRPC HTTP/2 connection stays open, and messages are multiplexed on the same socket.
18.3.3 Python PhysicsServicer
The Python server dispatches incoming requests to specialised engines. Each engine corresponds to a module in this course:
class UrbanPhysicsServicer(urban_sim_pb2_grpc.UrbanPhysicsServiceServicer):
"""gRPC servicer dispatching to physics engines."""
def __init__(self):
self.pollution_engine = CanyonPollutionEngine() # Module 16
self.mfg_engine = MFGRoutingEngine() # Module 12
self.signal_engine = SignalControlEngine() # Module 13
def ComputePollution(self, request, context):
t0 = time.perf_counter()
results = []
for canyon in request.canyons:
# Aggregate vehicle emissions on this edge
edge_vehicles = [v for v in request.vehicles
if v.edge_id == canyon.edge_id]
S_e = sum(v.nox_emission for v in edge_vehicles) / max(canyon.length, 1)
# OSPM computation
conc = self.pollution_engine.ospm(S_e, canyon, request.background_no2)
results.append(conc)
dt = (time.perf_counter() - t0) * 1000
return PollutionResponse(results=results, computation_time_ms=dt)
def ComputeRoutes(self, request, context):
t0 = time.perf_counter()
routes = self.mfg_engine.solve(request.densities, request.pollution,
request.lambda_pollution)
dt = (time.perf_counter() - t0) * 1000
return MFGResponse(routes=routes, nash_gap=self.mfg_engine.gap,
computation_time_ms=dt)
def ComputeSignal(self, request, context):
t0 = time.perf_counter()
phase, green = self.signal_engine.pmp_control(request)
dt = (time.perf_counter() - t0) * 1000
return SignalResponse(tls_id=request.tls_id, phase_index=phase,
green_time=green, computation_time_ms=dt)
def StreamSimulation(self, request_iterator, context):
"""Bidirectional streaming: process SimSteps, yield PollutionResponses."""
for sim_step in request_iterator:
# Process each timestep
pollution = self._process_step(sim_step)
yield pollution
# Periodically update routes (every 30 steps)
if sim_step.step_number % 30 == 0:
self._update_mfg_routes(sim_step)18.3.4 Java SUMOController with gRPC Client
public class SUMOController {
private final Socket traciSocket; // Binary TraCI to SUMO
private final ManagedChannel grpcChannel; // gRPC to Python physics
private final UrbanPhysicsServiceGrpc.UrbanPhysicsServiceStub asyncStub;
public SUMOController(String sumoHost, int sumoPort,
String physicsHost, int physicsPort) {
// TraCI connection
traciSocket = new Socket(sumoHost, sumoPort);
// gRPC connection
grpcChannel = ManagedChannelBuilder
.forAddress(physicsHost, physicsPort)
.usePlaintext()
.build();
asyncStub = UrbanPhysicsServiceGrpc.newStub(grpcChannel);
}
public void runSimulation(int nSteps) {
StreamObserver<SimStep> requestStream = asyncStub.streamSimulation(
new StreamObserver<PollutionResponse>() {
@Override
public void onNext(PollutionResponse response) {
// Apply pollution-aware rerouting via TraCI
for (CanyonPollution p : response.getResultsList()) {
if (p.getExceedsWho()) {
rerouteVehiclesFromEdge(p.getEdgeId());
}
}
}
@Override
public void onCompleted() { /* stream done */ }
@Override
public void onError(Throwable t) { t.printStackTrace(); }
}
);
for (int step = 0; step < nSteps; step++) {
// 1. Advance SUMO via TraCI
traciSimulationStep();
// 2. Collect vehicle states via TraCI
List<VehicleState> vehicles = getVehicleStates();
// 3. Send to physics server via gRPC
SimStep simStep = SimStep.newBuilder()
.setTime(step * 1.0)
.setStepNumber(step)
.addAllVehicles(vehicles)
.build();
requestStream.onNext(simStep);
}
requestStream.onCompleted();
}
}18.3.5 Deployment: startup.sh
The startup script orchestrates the three processes in the correct order:
#!/bin/bash
# startup.sh — Launch the integrated urban simulation
set -e
SUMO_CFG="city.sumocfg"
PHYSICS_PORT=50051
TRACI_PORT=8813
JAVA_JAR="sumo-controller.jar"
echo "=== Urban Simulation Startup ==="
# Step 1: Start Python physics server (gRPC)
echo "[1/3] Starting physics server on port $PHYSICS_PORT..."
python3 physics_server.py \
--port $PHYSICS_PORT \
--canyon-config canyon_geometries.json \
--workers 4 &
PHYSICS_PID=$!
sleep 2 # Wait for server to be ready
# Step 2: Start SUMO with TraCI
echo "[2/3] Starting SUMO with TraCI on port $TRACI_PORT..."
sumo -c $SUMO_CFG \
--remote-port $TRACI_PORT \
--emission-output emission.xml \
--step-length 1.0 \
--num-clients 1 &
SUMO_PID=$!
sleep 1
# Step 3: Start Java controller
echo "[3/3] Starting Java SUMO controller..."
java -jar $JAVA_JAR \
--sumo-host localhost --sumo-port $TRACI_PORT \
--physics-host localhost --physics-port $PHYSICS_PORT \
--steps 3600
echo "=== Simulation Complete ==="
# Cleanup
kill $PHYSICS_PID $SUMO_PID 2>/dev/null || true18.3.6 Latency Budget Analysis
The total time per simulation step must stay within the step length to maintain real-time (or faster) execution:
$$\tau_{\text{step}} = \tau_{\text{SUMO}} + \tau_{\text{TraCI}} + \tau_{\text{bridge}} + \tau_{\text{physics}} + \tau_{\text{control}}$$
For 1000 vehicles at 1-second simulation steps:
| Component | Time (ms) | % of Budget | Scales With |
|---|---|---|---|
| \(\tau_{\text{SUMO}}\): simulation step | ~20 | 2.0% | Vehicles, network size |
| \(\tau_{\text{TraCI}}\): state collection | ~5 | 0.5% | Vehicles queried |
| \(\tau_{\text{bridge}}\): gRPC transfer | ~0.3 | 0.03% | Message size |
| \(\tau_{\text{physics}}\): OSPM + k-epsilon | ~50 | 5.0% | Edges with canyons |
| \(\tau_{\text{control}}\): MFG + signal | ~100 | 10.0% | Network, MFG iterations |
| Total | ~175.3 | 17.5% | — |
| Headroom | ~824.7 | 82.5% | — |
With 82.5% headroom, the system can comfortably handle 1000 vehicles at 1-second steps. The real bottleneck appears at ~5000 vehicles, where SUMO itself takes ~100 ms and the physics engine takes ~250 ms for all canyon edges.
18.3.7 Python: Complete Physics Servicer
This code implements the full physics servicer, including the three engines (CanyonPollutionEngine, MFGRoutingEngine, SignalControlEngine) and a latency benchmark simulating realistic workloads.
Complete Physics Servicer with Latency Benchmark
PythonClick Run to execute the Python code
Code will be executed with Python 3 on the server
18.3.8 Key Takeaways
- The
urban_sim.protoschema defines all inter-process messages: VehicleState, SimStep, PollutionRequest/Response, MFGRequest/Response, SignalRequest/Response. - Bidirectional streaming (
stream SimStep → stream PollutionResponse) eliminates per-request overhead for the inner simulation loop. - The PhysicsServicer dispatches to three engines: CanyonPollutionEngine (Module 16), MFGRoutingEngine (Module 12), and SignalControlEngine (Module 13).
- The Java SUMOController combines binary TraCI (to SUMO) with gRPC (to Python physics), bridging the Java and Python ecosystems.
- The latency budget for 1000 vehicles at 1-second steps leaves over 80% headroom, with SUMO taking ~20 ms and physics ~50–100 ms.
- The startup.sh script orchestrates the three processes: Python server, SUMO, Java controller, in the correct dependency order.