Chapter 18.3: Protobuf Integration

18.3.1 The urban_sim.proto Schema

Protocol Buffers provide the canonical data contract between all system components. The schema defines every message that crosses a process boundary, ensuring type safety between Python physics engines, Java SUMO controllers, and any future components.

syntax = "proto3";

package urban_sim;

// ──────────── Core Data Types ────────────

message VehicleState {
  string id = 1;
  double x = 2;
  double y = 3;
  double speed = 4;           // m/s
  double acceleration = 5;    // m/s^2
  double angle = 6;           // degrees
  string edge_id = 7;
  string lane_id = 8;
  double nox_emission = 9;    // mg/s
  double pmx_emission = 10;   // mg/s
  double co2_emission = 11;   // mg/s
}

message SimStep {
  double time = 1;            // simulation time (s)
  repeated VehicleState vehicles = 2;
  int32 step_number = 3;
}

// ──────────── Canyon Pollution ────────────

message CanyonGeometry {
  string edge_id = 1;
  double height = 2;          // H (m)
  double width = 3;           // W (m)
  double length = 4;          // L (m)
  double wind_speed = 5;      // u_H (m/s)
  double wind_direction = 6;  // degrees from canyon axis
}

message PollutionRequest {
  repeated CanyonGeometry canyons = 1;
  repeated VehicleState vehicles = 2;
  double background_no2 = 3;  // μg/m³
}

message CanyonPollution {
  string edge_id = 1;
  double c_direct = 2;       // μg/m³
  double c_recirculation = 3; // μg/m³
  double c_total = 4;        // μg/m³
  double emission_density = 5; // mg/m/s
  bool exceeds_who = 6;
}

message PollutionResponse {
  repeated CanyonPollution results = 1;
  double computation_time_ms = 2;
}

// ──────────── MFG Routing ────────────

message MFGRequest {
  repeated EdgeDensity densities = 1;
  repeated CanyonPollution pollution = 2;
  double lambda_pollution = 3;  // pollution cost weight
}

message EdgeDensity {
  string edge_id = 1;
  double density = 2;        // veh/km
  double flow = 3;           // veh/hr
}

message RouteUpdate {
  string vehicle_id = 1;
  repeated string edge_ids = 2;  // new route
}

message MFGResponse {
  repeated RouteUpdate routes = 1;
  double nash_gap = 2;        // MFG convergence metric
  double computation_time_ms = 3;
}

// ──────────── Signal Control ────────────

message SignalRequest {
  string tls_id = 1;
  repeated QueueLength queues = 2;
  repeated CanyonPollution nearby_pollution = 3;
  double current_time = 4;
}

message QueueLength {
  string lane_id = 1;
  int32 queue_count = 2;
  double queue_length_m = 3;
}

message SignalResponse {
  string tls_id = 1;
  int32 phase_index = 2;
  double green_time = 3;      // seconds
  double computation_time_ms = 4;
}

// ──────────── Service Definition ────────────

service UrbanPhysicsService {
  // Unary RPCs
  rpc ComputePollution(PollutionRequest) returns (PollutionResponse);
  rpc ComputeRoutes(MFGRequest) returns (MFGResponse);
  rpc ComputeSignal(SignalRequest) returns (SignalResponse);

  // Bidirectional streaming (for real-time coupling)
  rpc StreamSimulation(stream SimStep) returns (stream PollutionResponse);
}

18.3.2 Bidirectional Streaming

The StreamSimulation RPC uses gRPC bidirectional streaming. The SUMO controller streams SimStep messages (vehicle states at each timestep), and the physics server responds with PollutionResponse messages:

SUMO Controller (Java)           Physics Server (Python)
       │                                   │
       │──── SimStep(t=0, vehicles) ────→  │
       │                                   │── compute OSPM
       │  ←── PollutionResponse(t=0) ──── │
       │                                   │
       │──── SimStep(t=1, vehicles) ────→  │
       │                                   │── compute OSPM + MFG
       │  ←── PollutionResponse(t=1) ──── │
       │  ←── MFGResponse(routes) ──────  │
       │                                   │
       ⋮                                   ⋮

Streaming avoids the overhead of establishing a new request for each timestep. The gRPC HTTP/2 connection stays open, and messages are multiplexed on the same socket.

18.3.3 Python PhysicsServicer

The Python server dispatches incoming requests to specialised engines. Each engine corresponds to a module in this course:

class UrbanPhysicsServicer(urban_sim_pb2_grpc.UrbanPhysicsServiceServicer):
    """gRPC servicer dispatching to physics engines."""

    def __init__(self):
        self.pollution_engine = CanyonPollutionEngine()   # Module 16
        self.mfg_engine = MFGRoutingEngine()              # Module 12
        self.signal_engine = SignalControlEngine()         # Module 13

    def ComputePollution(self, request, context):
        t0 = time.perf_counter()
        results = []
        for canyon in request.canyons:
            # Aggregate vehicle emissions on this edge
            edge_vehicles = [v for v in request.vehicles
                           if v.edge_id == canyon.edge_id]
            S_e = sum(v.nox_emission for v in edge_vehicles) / max(canyon.length, 1)

            # OSPM computation
            conc = self.pollution_engine.ospm(S_e, canyon, request.background_no2)
            results.append(conc)

        dt = (time.perf_counter() - t0) * 1000
        return PollutionResponse(results=results, computation_time_ms=dt)

    def ComputeRoutes(self, request, context):
        t0 = time.perf_counter()
        routes = self.mfg_engine.solve(request.densities, request.pollution,
                                        request.lambda_pollution)
        dt = (time.perf_counter() - t0) * 1000
        return MFGResponse(routes=routes, nash_gap=self.mfg_engine.gap,
                          computation_time_ms=dt)

    def ComputeSignal(self, request, context):
        t0 = time.perf_counter()
        phase, green = self.signal_engine.pmp_control(request)
        dt = (time.perf_counter() - t0) * 1000
        return SignalResponse(tls_id=request.tls_id, phase_index=phase,
                            green_time=green, computation_time_ms=dt)

    def StreamSimulation(self, request_iterator, context):
        """Bidirectional streaming: process SimSteps, yield PollutionResponses."""
        for sim_step in request_iterator:
            # Process each timestep
            pollution = self._process_step(sim_step)
            yield pollution

            # Periodically update routes (every 30 steps)
            if sim_step.step_number % 30 == 0:
                self._update_mfg_routes(sim_step)

18.3.4 Java SUMOController with gRPC Client

public class SUMOController {
    private final Socket traciSocket;          // Binary TraCI to SUMO
    private final ManagedChannel grpcChannel;  // gRPC to Python physics
    private final UrbanPhysicsServiceGrpc.UrbanPhysicsServiceStub asyncStub;

    public SUMOController(String sumoHost, int sumoPort,
                          String physicsHost, int physicsPort) {
        // TraCI connection
        traciSocket = new Socket(sumoHost, sumoPort);

        // gRPC connection
        grpcChannel = ManagedChannelBuilder
            .forAddress(physicsHost, physicsPort)
            .usePlaintext()
            .build();
        asyncStub = UrbanPhysicsServiceGrpc.newStub(grpcChannel);
    }

    public void runSimulation(int nSteps) {
        StreamObserver<SimStep> requestStream = asyncStub.streamSimulation(
            new StreamObserver<PollutionResponse>() {
                @Override
                public void onNext(PollutionResponse response) {
                    // Apply pollution-aware rerouting via TraCI
                    for (CanyonPollution p : response.getResultsList()) {
                        if (p.getExceedsWho()) {
                            rerouteVehiclesFromEdge(p.getEdgeId());
                        }
                    }
                }

                @Override
                public void onCompleted() { /* stream done */ }

                @Override
                public void onError(Throwable t) { t.printStackTrace(); }
            }
        );

        for (int step = 0; step < nSteps; step++) {
            // 1. Advance SUMO via TraCI
            traciSimulationStep();

            // 2. Collect vehicle states via TraCI
            List<VehicleState> vehicles = getVehicleStates();

            // 3. Send to physics server via gRPC
            SimStep simStep = SimStep.newBuilder()
                .setTime(step * 1.0)
                .setStepNumber(step)
                .addAllVehicles(vehicles)
                .build();
            requestStream.onNext(simStep);
        }

        requestStream.onCompleted();
    }
}

18.3.5 Deployment: startup.sh

The startup script orchestrates the three processes in the correct order:

#!/bin/bash
# startup.sh — Launch the integrated urban simulation
set -e

SUMO_CFG="city.sumocfg"
PHYSICS_PORT=50051
TRACI_PORT=8813
JAVA_JAR="sumo-controller.jar"

echo "=== Urban Simulation Startup ==="

# Step 1: Start Python physics server (gRPC)
echo "[1/3] Starting physics server on port $PHYSICS_PORT..."
python3 physics_server.py \
    --port $PHYSICS_PORT \
    --canyon-config canyon_geometries.json \
    --workers 4 &
PHYSICS_PID=$!
sleep 2  # Wait for server to be ready

# Step 2: Start SUMO with TraCI
echo "[2/3] Starting SUMO with TraCI on port $TRACI_PORT..."
sumo -c $SUMO_CFG \
    --remote-port $TRACI_PORT \
    --emission-output emission.xml \
    --step-length 1.0 \
    --num-clients 1 &
SUMO_PID=$!
sleep 1

# Step 3: Start Java controller
echo "[3/3] Starting Java SUMO controller..."
java -jar $JAVA_JAR \
    --sumo-host localhost --sumo-port $TRACI_PORT \
    --physics-host localhost --physics-port $PHYSICS_PORT \
    --steps 3600

echo "=== Simulation Complete ==="

# Cleanup
kill $PHYSICS_PID $SUMO_PID 2>/dev/null || true

18.3.6 Latency Budget Analysis

The total time per simulation step must stay within the step length to maintain real-time (or faster) execution:

$$\tau_{\text{step}} = \tau_{\text{SUMO}} + \tau_{\text{TraCI}} + \tau_{\text{bridge}} + \tau_{\text{physics}} + \tau_{\text{control}}$$

For 1000 vehicles at 1-second simulation steps:

ComponentTime (ms)% of BudgetScales With
\(\tau_{\text{SUMO}}\): simulation step~202.0%Vehicles, network size
\(\tau_{\text{TraCI}}\): state collection~50.5%Vehicles queried
\(\tau_{\text{bridge}}\): gRPC transfer~0.30.03%Message size
\(\tau_{\text{physics}}\): OSPM + k-epsilon~505.0%Edges with canyons
\(\tau_{\text{control}}\): MFG + signal~10010.0%Network, MFG iterations
Total~175.317.5%
Headroom~824.782.5%

With 82.5% headroom, the system can comfortably handle 1000 vehicles at 1-second steps. The real bottleneck appears at ~5000 vehicles, where SUMO itself takes ~100 ms and the physics engine takes ~250 ms for all canyon edges.

18.3.7 Python: Complete Physics Servicer

This code implements the full physics servicer, including the three engines (CanyonPollutionEngine, MFGRoutingEngine, SignalControlEngine) and a latency benchmark simulating realistic workloads.

Complete Physics Servicer with Latency Benchmark

Python
script.py311 lines

Click Run to execute the Python code

Code will be executed with Python 3 on the server

18.3.8 Key Takeaways

  • The urban_sim.proto schema defines all inter-process messages: VehicleState, SimStep, PollutionRequest/Response, MFGRequest/Response, SignalRequest/Response.
  • Bidirectional streaming (stream SimStep → stream PollutionResponse) eliminates per-request overhead for the inner simulation loop.
  • The PhysicsServicer dispatches to three engines: CanyonPollutionEngine (Module 16), MFGRoutingEngine (Module 12), and SignalControlEngine (Module 13).
  • The Java SUMOController combines binary TraCI (to SUMO) with gRPC (to Python physics), bridging the Java and Python ecosystems.
  • The latency budget for 1000 vehicles at 1-second steps leaves over 80% headroom, with SUMO taking ~20 ms and physics ~50–100 ms.
  • The startup.sh script orchestrates the three processes: Python server, SUMO, Java controller, in the correct dependency order.