众所周知,运行在PHP-FPM模式下的PHP代码并非常驻内存,而WebSocket实时通信又需要常驻内存,可以说PHP-FPM模式跟
WebSocket就走不到一块去。
虽然可以直接使用AMPHP、REACTPHP、Swoole等众多PHP-CLI的库和扩展来让PHP处理WebSocket业务,但这就相当于做一个新项目了,跟原有的PHP-FPM项目不能很好地兼容。通过WebSocket网关跟WebSocket客户端交互,具体的业务仍然由PHP-FPM框架处理,不仅能以零改动的方式让PHP-FPM拥有了处理WebSocket协议的能力,还不会存在PHP-FPM和PHP-CLI之间生态不兼容的问题。当然最重要的就是不需要更换框架。
架构图如下所示。

WebSocket 客户端跟WebSocket 网关建立连接,WebSocket 网关接收到WebSocket 客户端发送的数据,通过HTTP协议将数据发送到Nginx,Nginx再用FastCGI协议发送给PHP-FPM,PHP脚本处理完将数据沿原路反方向回传到WebSocket 客户端,这是接收逻辑;如果希望PHP主动推送数据到WebSocket 客户端,则需要额外的HTTP 客户端,因为PHP-FPM不适合持续运行推送,不过这个并不算关键的功能,理论上完全可以通过WebSocket 网关来定时触发,此处不作详细说明。主动推送时,HTTP 客户端向Nginx发起HTTP请求,由PHP-FPM处理,如果PHP脚本判断该请求需要推送到WebSocket 客户端,则将数据发送至WebSocket 网关对内暴露的HTTP接口,WebSocket 网关根据请求选择对应的WebSocket 客户端通信。
PHP 业务代码
websocket.php为PHP和WebSocket 客户端交互的业务逻辑,访问链接为http://localhost/websocket.php 。除了要区分接收和发送行为之外,其它逻辑与一般PHP-FPM项目无异。其中$url变量为WebSocket 网关开放的接口,该接口供PHP主动推送消息给WebSocket 客户端。
<?php
// websocket.php
if (
!empty($_POST['type'])
&& !empty($_POST['client_id'])
&& isset($_POST['message'])
) {
if ($_POST['type'] == 'client') {
// 接收客户端消息,推送到 client_id 对应的 WebSocket 客户端
$url = 'http://host.docker.internal:8080/send';
$cmd = sprintf('curl -d "message=%s&type=client&client_id=%s" %s', $_POST['message'], $_POST['client_id'], $url);
system($cmd);
echo PHP_EOL, $cmd, PHP_EOL;
} else {
// 接收处理 WebSocket 客户端消息
switch ($_POST['message']) {
case 'name':
echo 'lwlinux';
break;
default:
echo 'default';
}
echo ' to '. $_POST['client_id'];
}
} else {
echo 'something wrong';
}
WebSocket 网关
WebSocket 网关的核心功能有两个,一个面向WebSocket 客户端,也就是ws://localhost:8080/ws,对连接进行保活;另一个面向PHP,提供PHP主动推送的接口,将数据转发到WebSocket 客户端,即http://host.docker.internal:8080/send ,该接口接收POST请求,表单参数包括message、client_id(WebSocket 客户端的唯一标识)。启动前,需要提前知道PHP服务地址http://localhost/websocket.php 。
package main
import (
"bytes"
"fmt"
"github.com/gorilla/websocket"
"io/ioutil"
"log"
"net/http"
"sync"
"time"
)
const (
phpFpmURL = "http://localhost/websocket.php"
wsAddr = "localhost:8080"
wsPath = "/ws"
)
type WebSocketGateway struct {
connections sync.Map
}
func (gw *WebSocketGateway) Start() {
http.HandleFunc(wsPath, gw.handleWebSocket)
http.HandleFunc("/send", gw.handleSendMessage)
log.Printf("WebSocket server started at ws://%s%s", wsAddr, wsPath)
err := http.ListenAndServe(wsAddr, nil)
if err != nil {
log.Fatalf("Error starting WebSocket server: %v", err)
}
}
func (gw *WebSocketGateway) handleWebSocket(w http.ResponseWriter, r *http.Request) {
upGrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
conn, err := upGrader.Upgrade(w, r, nil)
if err != nil {
log.Println("Upgrade error:", err)
return
}
defer conn.Close()
clientID := fmt.Sprintf("%s", conn.RemoteAddr().String())
gw.connections.Store(clientID, conn)
log.Printf("New WebSocket connection: %s", clientID)
for {
messageType, p, err := conn.ReadMessage()
if err != nil {
log.Printf("Read error for client %s: %v", clientID, err)
break
}
log.Printf("Received message from client %s: %s", clientID, p)
resp, err := sendToPhpFpm(string(p), clientID)
if err != nil {
log.Println("Error communicating with PHP-FPM:", err)
continue
}
err = conn.WriteMessage(messageType, resp)
if err != nil {
log.Printf("Write error for client %s: %v", clientID, err)
break
}
}
gw.connections.Delete(clientID)
log.Printf("WebSocket connection closed: %s", clientID)
}
func sendToPhpFpm(message, clientID string) ([]byte, error) {
reqBody := fmt.Sprintf("type=server&message=%s&client_id=%s", message, clientID)
req, err := http.NewRequest("POST", phpFpmURL, bytes.NewBuffer([]byte(reqBody)))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return respBody, nil
}
func (gw *WebSocketGateway) handleSendMessage(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(http.StatusText(http.StatusBadRequest)))
return
}
clientID := r.Form.Get("client_id")
message := r.Form.Get("message")
_ = r.Form.Get("type")
conn, ok := gw.connections.Load(clientID)
if !ok {
http.Error(w, "Client not found", http.StatusNotFound)
return
}
wsConn := conn.(*websocket.Conn)
err = wsConn.WriteMessage(websocket.TextMessage, []byte(message))
if err != nil {
http.Error(w, "Failed to send message", http.StatusInternalServerError)
return
}
w.Write([]byte("Message sent"))
}
func main() {
gateway := &WebSocketGateway{}
gateway.Start()
}
首先运行WebSocket 网关。
WebSocket 客户端
在浏览器控制台执行以下代码。
var ws = new WebSocket('ws://127.0.0.1:8080/ws')
ws.onopen = function (params) {
ws.send('name')
ws.send('')
}
ws.onmessage = function (params) {
console.log(params.data)
}
ws.onclose = function (params) {
console.log('close')
}
ws.onerror = function (params) {
console.log('error')
}
输出以下内容。
lwlinux to 127.0.0.1:51110
default to 127.0.0.1:51110
PHP 推送消息到 WebSocket 客户端
通过curl调用PHP代码,client_id根据实际情况修改,通过浏览器调用PHP发送消息同理。
$ curl -d 'message=hello_world_from_curl&client_id=127.0.0.1:51110&type=client' http://localhost/websocket.php
Message sent
curl -d "message=hello_world_from_curl&type=client&client_id=127.0.0.1:51110" http://host.docker.internal:8080/send
这时,WebSocket 客户端应该会输出该文本hello_world_from_curl。
这个思路不仅可以用于为PHP-FPM添加WebSocket功能,还可以为PHP-FPM做连接池等等。