13.6.26

Raspberry Pi MQTT Broker TR-369 Controller Sniffer Protobuf

在Raspberry Pi 3B上跑docker模擬TR-369實驗,一個發送一個接收



 ~/usp-test/

├── docker-compose.yml

├── mosquitto.conf

└── app/

    ├── controller.py   (發送控制指令)

    └── sniffer.py      (側錄封包)

-------------------------------------------------------------------------------------------------------------

app/controller.py

-----------------------------------------------------------------------------------------------------------------

import paho.mqtt.client as mqtt

import time


# TR-369 在 MQTT 上的標準 Topic 格式通常是: v1/usp/agent/<Endpoint-ID>

# 發送一個假的 Agent 節點

TOPIC_GET      = "v1/usp/agent/pi3b-device/get"

TOPIC_OPERATE  = "v1/usp/agent/pi3b-device/operate"


def run_experiment():

    client = mqtt.Client()

    # 連線到本地 Docker 跑的 Mosquitto

    client.connect("127.0.0.1", 1883, 60)

    client.loop_start()

    

    print("--- 實驗開始:發送 TR-369 模擬指令 ---")

    

    # 實驗 1:模擬 Get 指令 (查詢 Wireshark 參數)

    # 這段 Hex 模擬 Protobuf 序列化的 TR-369 結構 (包含 Header 與 Get Request)

    fake_protobuf_get = bytes.fromhex("0a050801120131121d0a1b0a194465766963652e446576696365496e666f2e536f66747761726556657273696f6e")

    client.publish(TOPIC_GET, fake_protobuf_get, qos=1)

    print(f"[1] 已發送 Get 指令至 {TOPIC_GET} (查詢 Device.DeviceInfo.SoftwareVersion)")

    time.sleep(2)

    

    # 實驗 2:模擬 Operate 指令 (觸發重啟設備)

    fake_protobuf_operate = bytes.fromhex("0a05080112013212130a110a0f4465766963652e426f6f742829")

    client.publish(TOPIC_OPERATE, fake_protobuf_operate, qos=1)

    print(f"[2] 已發送 Operate 指令至 {TOPIC_OPERATE} (觸發 Device.Boot())")

    

    time.sleep(1)

    client.loop_stop()

    client.disconnect()

    print("--- 實驗結束 ---")


if __name__ == "__main__":

    run_experiment()

------------------------------------------------------------------------------------------------------------

app/sniffer.py

-----------------------------------------------------------------------------------------------------------

import paho.mqtt.client as mqtt

import time

import os


SAVE_DIR = "/app/captured_packets"

os.makedirs(SAVE_DIR, exist_ok=True)


def on_connect(client, userdata, flags, rc):

    print("【Sniffer 啟動】連線至 MQTT Broker,開始側錄 v1/usp/# ...")

    client.subscribe("v1/usp/#")


def on_message(client, userdata, msg):

    timestamp = int(time.time())

    # 將 Topic 中的斜線換成底線,做成安全的檔名

    safe_topic = msg.topic.replace("/", "_")

    filename = f"{SAVE_DIR}/usp_{timestamp}_{safe_topic}.bin"

    

    # 將原始二進位位元組 (Protobuf) 寫入檔案

    with open(filename, "wb") as f:

        f.write(msg.payload)

        

    print(f" [成功] 主題: {msg.topic} | 長度: {len(msg.payload)} bytes -> 儲存為 {filename}")


if __name__ == "__main__":

    client = mqtt.Client()

    client.on_connect = on_connect

    client.on_message = on_message

    

    client.connect("127.0.0.1", 1883, 60)

    client.loop_forever()

----------------------------------------------------------------------------------------------------------------------

docker-compose.yml

-------------------------------------------------------------------------------------------------------------------------

services:

  # 使用 32 位元萬用 Python 鏡像來建構我們的 CoAP 模擬環境

  coap-agent:

    image: python:3.11-slim

    container_name: usp-coap-agent

    restart: always

    # 使用 host 模式,讓 UDP 5683 直接綁在網卡上

    network_mode: "host" 

    volumes:

      - ./app:/app

      - ./logs:/var/log/usp

    command: sh -c "pip install --no-cache-dir aiocoap LinkHeader && tail -f /dev/null"

    deploy:

      resources:

        limits:

          memory: 128M # 限制記憶體

------------------------------------------------------------------------------------------------------------------

執行容器    #docker compose up -d

開接收端     #docker compose exec -it usp-mock-node python /app/sniffer.py

執行發送端    #docker compose exec -it usp-mock-node python /app/controller.py

要用SSH遠端連線開兩個視窗,然後再把封包傳回來用Wireshark分析

沒有留言:

張貼留言