通过网关为 PHP-FPM 插上 WebSocket 的翅膀

2025/01/07 PHP

众所周知,运行在PHP-FPM模式下的PHP代码并非常驻内存,而WebSocket实时通信又需要常驻内存,可以说PHP-FPM模式跟 WebSocket就走不到一块去。

虽然可以直接使用AMPHPREACTPHPSwoole等众多PHP-CLI的库和扩展来让PHP处理WebSocket业务,但这就相当于做一个新项目了,跟原有的PHP-FPM项目不能很好地兼容。通过WebSocket网关跟WebSocket客户端交互,具体的业务仍然由PHP-FPM框架处理,不仅能以零改动的方式让PHP-FPM拥有了处理WebSocket协议的能力,还不会存在PHP-FPMPHP-CLI之间生态不兼容的问题。当然最重要的就是不需要更换框架。

架构图如下所示。

websocket-gateway

WebSocket 客户端WebSocket 网关建立连接,WebSocket 网关接收到WebSocket 客户端发送的数据,通过HTTP协议将数据发送到NginxNginx再用FastCGI协议发送给PHP-FPMPHP脚本处理完将数据沿原路反方向回传到WebSocket 客户端,这是接收逻辑;如果希望PHP主动推送数据到WebSocket 客户端,则需要额外的HTTP 客户端,因为PHP-FPM不适合持续运行推送,不过这个并不算关键的功能,理论上完全可以通过WebSocket 网关来定时触发,此处不作详细说明。主动推送时,HTTP 客户端Nginx发起HTTP请求,由PHP-FPM处理,如果PHP脚本判断该请求需要推送到WebSocket 客户端,则将数据发送至WebSocket 网关对内暴露的HTTP接口,WebSocket 网关根据请求选择对应的WebSocket 客户端通信。

PHP 业务代码

websocket.phpPHPWebSocket 客户端交互的业务逻辑,访问链接为http://localhost/websocket.php 。除了要区分接收和发送行为之外,其它逻辑与一般PHP-FPM项目无异。其中$url变量为WebSocket 网关开放的接口,该接口供PHP主动推送消息给WebSocket 客户端

<?php
// websocket.php

if (
    !empty($_POST['type'])
    && !empty($_POST['client_id'])
    && isset($_POST['message'])
) {
    if ($_POST['type'] == 'client') {
        // 接收客户端消息,推送到 client_id 对应的 WebSocket 客户端
        $url = 'http://host.docker.internal:8080/send';
        $cmd = sprintf('curl -d "message=%s&type=client&client_id=%s" %s', $_POST['message'], $_POST['client_id'], $url);
        system($cmd);
        echo PHP_EOL, $cmd, PHP_EOL;
    } else {
        // 接收处理 WebSocket 客户端消息
        switch ($_POST['message']) {
        case 'name':
            echo 'lwlinux';
            break;
        default:
            echo 'default';
        }

        echo ' to '. $_POST['client_id'];
    }
} else {
    echo 'something wrong';
}

WebSocket 网关

WebSocket 网关的核心功能有两个,一个面向WebSocket 客户端,也就是ws://localhost:8080/ws,对连接进行保活;另一个面向PHP,提供PHP主动推送的接口,将数据转发到WebSocket 客户端,即http://host.docker.internal:8080/send ,该接口接收POST请求,表单参数包括messageclient_idWebSocket 客户端的唯一标识)。启动前,需要提前知道PHP服务地址http://localhost/websocket.php

package main

import (
    "bytes"
    "fmt"
    "github.com/gorilla/websocket"
    "io/ioutil"
    "log"
    "net/http"
    "sync"
    "time"
)

const (
    phpFpmURL = "http://localhost/websocket.php"
    wsAddr    = "localhost:8080"
    wsPath    = "/ws"
)

type WebSocketGateway struct {
    connections sync.Map
}

func (gw *WebSocketGateway) Start() {
    http.HandleFunc(wsPath, gw.handleWebSocket)
    http.HandleFunc("/send", gw.handleSendMessage)
    log.Printf("WebSocket server started at ws://%s%s", wsAddr, wsPath)
    err := http.ListenAndServe(wsAddr, nil)
    if err != nil {
        log.Fatalf("Error starting WebSocket server: %v", err)
    }
}

func (gw *WebSocketGateway) handleWebSocket(w http.ResponseWriter, r *http.Request) {
    upGrader := websocket.Upgrader{
        CheckOrigin: func(r *http.Request) bool {
            return true
        },
    }

    conn, err := upGrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println("Upgrade error:", err)
        return
    }
    defer conn.Close()

    clientID := fmt.Sprintf("%s", conn.RemoteAddr().String())
    gw.connections.Store(clientID, conn)
    log.Printf("New WebSocket connection: %s", clientID)

    for {
        messageType, p, err := conn.ReadMessage()
        if err != nil {
            log.Printf("Read error for client %s: %v", clientID, err)
            break
        }

        log.Printf("Received message from client %s: %s", clientID, p)

        resp, err := sendToPhpFpm(string(p), clientID)
        if err != nil {
            log.Println("Error communicating with PHP-FPM:", err)
            continue
        }

        err = conn.WriteMessage(messageType, resp)
        if err != nil {
            log.Printf("Write error for client %s: %v", clientID, err)
            break
        }
    }

    gw.connections.Delete(clientID)
    log.Printf("WebSocket connection closed: %s", clientID)
}

func sendToPhpFpm(message, clientID string) ([]byte, error) {
    reqBody := fmt.Sprintf("type=server&message=%s&client_id=%s", message, clientID)
    req, err := http.NewRequest("POST", phpFpmURL, bytes.NewBuffer([]byte(reqBody)))
    if err != nil {
        return nil, err
    }
    req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

    client := &http.Client{Timeout: 10 * time.Second}
    resp, err := client.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    respBody, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return nil, err
    }

    return respBody, nil
}

func (gw *WebSocketGateway) handleSendMessage(w http.ResponseWriter, r *http.Request) {
    err := r.ParseForm()
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        w.Write([]byte(http.StatusText(http.StatusBadRequest)))
        return
    }

    clientID := r.Form.Get("client_id")
    message := r.Form.Get("message")
    _ = r.Form.Get("type")

    conn, ok := gw.connections.Load(clientID)
    if !ok {
        http.Error(w, "Client not found", http.StatusNotFound)
        return
    }

    wsConn := conn.(*websocket.Conn)

    err = wsConn.WriteMessage(websocket.TextMessage, []byte(message))
    if err != nil {
        http.Error(w, "Failed to send message", http.StatusInternalServerError)
        return
    }

    w.Write([]byte("Message sent"))
}

func main() {
    gateway := &WebSocketGateway{}
    gateway.Start()
}

首先运行WebSocket 网关

WebSocket 客户端

在浏览器控制台执行以下代码。

var ws = new WebSocket('ws://127.0.0.1:8080/ws')
ws.onopen = function (params) {
    ws.send('name')
    ws.send('')
}
ws.onmessage = function (params) {
    console.log(params.data)
}
ws.onclose = function (params) {
    console.log('close')
}
ws.onerror = function (params) {
    console.log('error')
}

输出以下内容。

lwlinux to 127.0.0.1:51110
default to 127.0.0.1:51110

PHP 推送消息到 WebSocket 客户端

通过curl调用PHP代码,client_id根据实际情况修改,通过浏览器调用PHP发送消息同理。

$ curl -d 'message=hello_world_from_curl&client_id=127.0.0.1:51110&type=client' http://localhost/websocket.php
Message sent
curl -d "message=hello_world_from_curl&type=client&client_id=127.0.0.1:51110" http://host.docker.internal:8080/send

这时,WebSocket 客户端应该会输出该文本hello_world_from_curl

这个思路不仅可以用于为PHP-FPM添加WebSocket功能,还可以为PHP-FPM做连接池等等。

Search

    Table of Contents