Go 语言实现 Redis 客户端

2020/04/02 计算机网络 Go

除 Telnet 以及 Go 代码部分之外,其它内容基本翻译自官网。另外,写完本文之后,发现一篇文章写得更好,代码当然也比我的更好,传送门 Reading and Writing Redis Protocol in Go。路漫漫,我还要更努力。

其实去年就看过实现 Redis 客户端的一些文章,但是由于当前对网络、协议方面不太熟悉,只是产生了一点感觉。现在终于可以自己实现了,心里有点小激动。其实自撸客户端除了对网络熟悉之外,对编译原理最好也有一定了解,如状态机啥的,不然代码会有点难看,当然我也仅仅有粗浅的了解。计算机网络,操作系统,编译原理,算法和数据结构,计算机组成原理,几座大山一定要翻越,go go go~当然,软件设计功力也要加强,后面有空还要玩数据库,压力山大。话不多说,现在进入正题吧。

Redis 通信协议介绍

Redis 客户端使用 RESP(REdis Serialization Protocol) 协议跟 Redis 服务端进行通信。RESP 协议是专门为 Redis 而设计的,当然,它也可以被其他 CS 架构软件项目使用。

RESP 在以下几方面进行了折衷。

  • 易于实现
  • 解析快速
  • 可读的

RESP 可以序列化不同的数据类型,如 Integers, Strings, Arrays 以及一个特别的错误类型 Errors。客户端发送到服务端的请求以字符串数组的形式表示即将要执行的命令。

RESP 协议仅用于 CS 通信。Redis 集群为了在节点间交换数据,所以用的是另一个二进制协议。

### 网络层 客户端通过创建端口号为 6379 的 TCP 连接来连接到 Redis 服务端。虽然 RESP 在技术上并没有指定用于 TCP,但在 Redis 的上下中,该协议仅用于 TCP 连接(或因似于 Unix 套接字的面向流的连接)。

请求-响应模型

Redis 接收由不同参数组成的命令。一旦接收到命令,就会进行处理并将响应发送回客户端。

这可能是最简单的模型,但还有两个例外:

  • Redis 支持管道。所以客户端可以一次性发送多个命令,并等待响应
  • 一旦客户端订阅了 Pub/Sub 频道,协议就会变成一个推协议,也就是说,这时候客户端不需要发送命令,因为服务端一旦接收到(客户端订阅的消息,需要其它客户端发布)就会自动向(订阅该频道的客户端发送新消息

除此之外,Redis 协议就只是一个简单的请求-响应协议。

RESP 协议说明

RESP 协议最早在 Redis 1.2 中实现,在 Redis 2.0 中才正式成为(客户端)与 Redis 服务端通信的标准方式。

RESP 是序列化协议,支持以下数据类型:Simple Strings, Errors, Integers, Bulk Strings 和 Arrays。

RESP 在 Redis 中作为请求-响应协议用法如下:

  • 客户端向 Redis 服务端以 REAP Arrays 形式发送 Bulk Strings
  • 服务端根据命令的实现来响应其中一种 RESP 类型

在 RESP 中,一些数据的类型取决于第一个字节:

  • Simple Strings 响应的第一个字节是+
  • Errors 响应的第一个字节是-
  • Integers 响应的第一个字节是:
  • Bulk Strings 响应的第一个字节是$
  • Arrays 响应的第一个字节是*

除此之外,RESP 也可能使用一个特别的 Bulk Strings 或 Arrays 变体来代表 Null 值。在 RESP 中协议的不同部分使用\r\n(CRLF)来结束。

RESP Simple Strings

Simple Strings 编码方式:+符号,后面跟着不含 CR 或 LF 字符的字符串,最后以 CRLF(\r\n)结束。示例如下:

"+OK\r\n"

Simple Strings 用来传输非二进制安全字符串。

RESP Errors

Errors 跟 Simple Strings 很像,但第一个字符是-。示例如下:

"-Errors message\r\n"

RESP Integers

Integers 以:开头。示例如下:

":1000\r\n"

RESP Bulk Strings

Bulk Strings 用来表示一个最大为 512MB 的二进制安全的字符串,用以下方式进行编码:

  • $字符开头,后面跟着字符中的长度,以 CRLF 结束
  • 具体的字符串数据
  • 最后的 CRLF

因此,字符串 foobar 编码如下:

"$6\r\nfoobar\r\n"

空字符串如下:

"$0\r\n\r\n"

空字符串也叫 Null Bulk Strings。

客户端 API 不应该返回一个空字符串,而应该是 nil 对象。

RESP Arrays

客户端以 RESP Arrays 的形式向 Redis 服务端发送命令,同理,Redis 服务端也会用 RESP Arrays 响应客户端。

RESP Arrays 用以下格式发送:

  • *作为首字节,后面跟着表示数组元素个数的十进制数,再跟着 CRLF
  • 数组元素额外的 RESP 类型

空 Array 表示方式如下:

"*0\r\n"

又例如,包含两个 RESP Bulk String 元素 foo 和 bar 的数组编码如下:

"*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"

可以看到在*<count>CRLF部分之后,组成 Arrays 的其他类型仅仅是一个连着一个罢了。例如,由三个整型数据组成的 Arrays 编码如下:

"*3\r\n:1\r\n:2\r\n:3\r\n"

Arrays 可以包含多个类型。例如,四个整型和一个 Bulk Strings 组成的列表可以编码如下:

*5\r\n:1\r\n:2\r\n:3\r\n:4\r\n$6\r\nfoobar\r\n"

Arrays 中的 Null 元素

在 Redis 中为了表示元素不存在,使用的是 Null,而不是空字符串。例如:

*3\r\n
$3\r\n
foo\r\n
$-1\r\n
$3\r\n
bar\r\n

第二个元素$-1\r\n是 Null,客户端库应该用类似于以下的形式返回:

["foo",nil,"bar"]

使用 Telnet 向 Redis 服务器发送命令

熟悉了 RESP 序列化格式之后,实现 Redis 客户端库就比较容易了。

下面指定客户端和服务端的两个交互:

  • 客户端向 Redis 服务端发送只由 Bulk Strings 组成的 RESP Arrays
  • Redis 服务端可以响应客户端任何有效的 RESP 数据类型

以下是一个简单的示例,默认 Redis 在运行。

$ telnet 127.0.0.1 6379
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.

*3
$3
SET
$5
hello
$5
world
+OK

*2
$3
GET
$5
hello
$5
world

*2
$3
GET
$5
hahah
$-1

PING
+PONG

以上是四条命令,第一条请求服务端设置 hello 的值为 world,服务端响应+OK\r\n,类型是 Simple Strings;第二条请求获取刚才设置的 hello 的值,服务端响应$5\r\nworld\r\n,类型是 Bulk Strings;第三条请求获取不存在的 hahah,服务端响应$-1\r\n,也就是 Null。最后一条是特别的命令。

Go 语言实现 Redis 客户端

// redis/redis.go
package redis

import (
    "fmt"
    "log"
    "net"
    "time"
)

type Client struct {
    Conn net.Conn
    Host string
    Port string
}

func (c *Client) Connect() {
    if c.Host == "" || c.Port == "" {
        c.Host = "127.0.0.1"
        c.Port = "6379"
    }

    conn, err := net.DialTimeout("tcp", c.Host+":"+c.Port, 10*time.Second)
    fatal(err)
    c.Conn = conn
}

func (c *Client) Set(name, value string) (string, error) {
    action := "set"
    cmd := assemble([]string{
        action,
        name,
        value,
    })

    return c.send(action, cmd)
}

func (c *Client) Get(name string) (string, error) {
    action := "get"
    cmd := assemble([]string{
        action,
        name,
    })

    return c.send(action, cmd)
}

func assemble(cmd []string) string {
    cmdstr := fmt.Sprintf("*%d\r\n", len(cmd))

    for _, v := range cmd {
        cmdstr += fmt.Sprintf("$%d\r\n%s\r\n", len(v), v)
    }

    fmt.Printf("assemble: %q\n", cmdstr)

    return cmdstr
}

func (c *Client) send(logAction, cmd string) (string, error) {
    _, err := c.Conn.Write([]byte(cmd))
    if err != nil {
        log.Println(logAction, ":", err)
        return "", err
    }

    return c.response(), nil
}

func (c *Client) response() string {
    buf := make([]byte, 1024)
    n, _ := c.Conn.Read(buf)
    return string(buf[:n])
}

func fatal(err error) {
    if err != nil {
        log.Fatal(err)
    }
}
// main.go
package main

import (
    "fmt"
    "redis"
)

func main() {
    r := redis.Client{}

    r.Connect()

    data, _ := r.Set("gogogo", "gogogo value")
    fmt.Printf("set response: %q\n", data)

    data, _ = r.Get("gogogo")
    fmt.Printf("get response: %q\n", data)
}

补充:大神的实现

这节将传送门中的代码整理了下,其中 main.go 代码是自己写的,并且加了蹩脚的自创英文注释 orz…

// redis/writer.go
package redis

import (
    "bufio"
    "io"
    "strconv"
)

var (
    arrayPrefixSlice      = []byte{'*'}
    bulkStringPrefixSlice = []byte{'$'}
    lineEndingSlice       = []byte{'\r', '\n'}
)

type RESPWriter struct {
    *bufio.Writer
}

func NewRESPWriter(writer io.Writer) *RESPWriter {
    return &RESPWriter{
        Writer: bufio.NewWriter(writer),
    }
}

// 以 RESP Arrays 的形式向服务器发送命令
func (w *RESPWriter) WriteCommand(args ...string) (err error) {
    // Write the array prefix and the number of arguments in the array.
    // 根据传入的参数写入数组的长度 *len\r\n
    w.Write(arrayPrefixSlice)
    w.WriteString(strconv.Itoa(len(args)))
    w.Write(lineEndingSlice)

    // 循环参数,写入 $len\r\narg\r\n
    for _, arg := range args {
        w.Write(bulkStringPrefixSlice)
        w.WriteString(strconv.Itoa(len(arg)))
        w.Write(lineEndingSlice)
        w.WriteString(arg)
        w.Write(lineEndingSlice)
    }

    return w.Flush()
}
// redis/reader.go
package redis

import (
    "bufio"
    "bytes"
    "errors"
    "io"
    "strconv"
)

const (
    SIMPLE_STRING = '+'
    BULK_STRING   = '$'
    INTEGER       = ':'
    ARRAY         = '*'
    ERROR         = '-'
)

var (
    ErrInvalidSyntax = errors.New("resp: invalid syntax")
)

type RESPReader struct {
    *bufio.Reader
}

func NewReader(reader io.Reader) *RESPReader {
    return &RESPReader{
        Reader: bufio.NewReaderSize(reader, 32*1024),
    }
}

// 所有服务器数据都通过这个方法获取
func (r *RESPReader) ReadObject() ([]byte, error) {
    // Get the whole info of response message through the first line
    line, err := r.readLine()
    if err != nil {
        return nil, err
    }

    // Judge the type of response through the first byte
    switch line[0] {
    // 简单字符串,整数,错误这三种数据类型都只有一行
    case SIMPLE_STRING, INTEGER, ERROR:
        return line, nil
    // Bulk 字符串不止一行,所以需要另外操作
    case BULK_STRING:
        return r.readBulkString(line)
    // 数组也不止一行,另外操作
    case ARRAY:
        return r.readArray(line)
    default:
        return nil, ErrInvalidSyntax
    }
}

func (r *RESPReader) readLine() (line []byte, err error) {
    line, err = r.ReadBytes('\n')
    if err != nil {
        return nil, err
    }

    if len(line) > 1 && line[len(line)-2] == '\r' {
        return line, nil
    } else {
        // Line was too short or \n wasn't preceded by \r.
        return nil, ErrInvalidSyntax
    }
}

func (r *RESPReader) readBulkString(line []byte) ([]byte, error) {
    // Get the amount of bytes of the Bulk String
    // bulk 字符串的字节数量
    count, err := r.getCount(line)
    if err != nil {
        return nil, err
    }

    // Empty string
    if count == -1 {
        return line, nil
    }

    // The explanation that the first line of the code below:
    // len(line) is the line that describe the length of Bulk String($number)
    // count is the actual length of Bulk String
    // 2 indicates the length of \r\n
    //
    // so the code below will read the actual Bulk String and \r\n
    buf := make([]byte, len(line)+count+2) // len(line) 是首行的长度count 是 bulk 字符串的长度,2 是结尾的 \r\n
    copy(buf, line)
    _, err = io.ReadFull(r, buf[len(line):])
    if err != nil {
        return nil, err
    }

    return buf, nil
}

// Get the amount of elements
func (r *RESPReader) getCount(line []byte) (int, error) {
    end := bytes.IndexByte(line, '\r')
    return strconv.Atoi(string(line[1:end]))
}

func (r *RESPReader) readArray(line []byte) ([]byte, error) {
    // Get number of array elements.
    // 获取数组元素的数量
    count, err := r.getCount(line)
    if err != nil {
        return nil, err
    }

    // Read 'count' number of RESP objects in the array.
    // 根据已知的数组元素数量,获取对应数量的 Object
    for i := 0; i < count; i++ {
        buf, err := r.ReadObject()
        if err != nil {
            return nil, err
        }
        line = append(line, buf...)
    }

    return line, nil
}
// main.go
package main

import (
    "fmt"
    "redis"
    "log"
    "net"
    "os"
    "strconv"
    "time"
)

func main() {
    conn, err := net.Dial("tcp", "localhost:6379")
    if err != nil {
        log.Fatalln(err)
    }

    // common(&conn)
    subscribe(&conn)
}

// 普通命令
func common(conn *net.Conn) {
    writer := redis.NewRESPWriter(*conn)
    err := writer.WriteCommand("get", "a")
    if err != nil {
        log.Println(err)
        os.Exit(1)
    }

    reader := redis.NewReader(*conn)
    res, err := reader.ReadObject()
    if err != nil {
        log.Println(err)
        os.Exit(1)
    }

    fmt.Printf("%q\n", res)
}

// 发布跟订阅要在不同的会话
func publish() {
    conn, err := net.Dial("tcp", "localhost:6379")
    if err != nil {
        log.Fatalln(err)
    }

    writer := redis.NewRESPWriter(conn)

    for i := 0; i < 100; i++ {
        time.Sleep(time.Second * 3)
        err := writer.WriteCommand("publish", "chan", strconv.Itoa(i))
        if err != nil {
            log.Println("publish: ", err)
        }
    }
}

// 订阅
func subscribe(conn *net.Conn) {
    done := make(chan bool, 1)
    go func() {
        writer := redis.NewRESPWriter(*conn)
        err := writer.WriteCommand("subscribe", "chan")
        if err != nil {
            log.Println("subscribe: ", err)
            os.Exit(1)
        }

        for {
            reader := redis.NewReader(*conn)
            res, err := reader.ReadObject()
            if err != nil {
                log.Println("read: ", err)
                continue
            }

            fmt.Printf("%q\n", res)
            time.Sleep(time.Second)
        }
    }()

    // 另开了一会话才可以,不能同一个会话既发送又订阅
    publish()

    <-done
}

由于本文只是为了简单说明 Go 实现 Redis 客户端的原理,所以仅仅实现了部分功能,代码也比较简陋。如果有兴趣,可以继续深入学习,本文就介绍到这。

补充2:Redis 常用命令原生请求

以下用#号作为注释,实际上在 telnet 中并不能这么做,以下只是方便区分命令和解释。

数据存取

字符串(String)

$ telnet 127.0.0.1 6379
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.

# set key value,设置 key 对应 value,字符串
*3
$3
set
$4
name
$5
value

# get key,获取 key 对应的 value,字符串
*2
$3
get
$4
name

列表(List)

# rpush list value,对应的还有 lpush,列表
*3
$5
rpush
$6
mylist
$5
value

# lrange list start stop,返回指定子列表
*4
$6
lrange
$6
mylist
$1
0
$2
-1

# rpop list,对应的还有 lpop,弹出一个列表元素
*2
$4
rpop
$6
mylist

哈希(Hash)

# hset hashname key value,设置 hash
*4
$4
hset
$8
hashname
$7
hashkey
$9
hashvalue

# hget hashname key,获取 hash
*3
$4
hget
$8
hashname
$7
hashkey

# hgetall hashname,获取整个 hash
*2
$7
hgetall
$8
hashname

集合(Set)

# sadd setname value [value...],集合设置
*3
$4
sadd
$4
setname
$5
value

# smembers setname,集合所有值
*2
$8
smembers
$7
setname

# sismember setname value,判断 value 是否为集合成员
*3
$9
sismember
$7
setname
$5
value

有序集合(sorted set)

# zadd setname sortvalue value,有序集合
*4
$4
zadd
$7
setname
$1
8
$5
value

# zrange setname start stop,有序集合子集,zrevrange 是倒序
*4
$6
zrange
$7
setname
$1
0
$2
-1

发布订阅

# 发布
*3
$7
publish
$12
channel_name
$7
message

# 订阅/创建 频道
*2
$9
subscribe
$12
channel_name

额外操作

# exists key [key...],例如判断 hash 数据是否存在
*3
$6
exists
$1
a
$3
aaa

# del key,删除某个 key-value 对,del key
*2
$3
del
$1
a

# type key,判断某个 key 对应的 value 是什么类型,type key
*2
$4
type
$5
hasha

# keys pattern,获取符合某种模式的所有 key,keys pattern
*2
$3
keys
$3
h*a

# randomkey,随机获得一个已经存在的 key
*1
$9
randomkey

# dbsize,获取当前数据库 key 的总数
*1
$6
dbsize

# rename oldname newname,更改 key 名称,newname 如果存在将被覆盖
*3
$6
rename
$1
a
$4
haha

# renamenx oldname newname,更改 key 名称,newname 如果存在则更新失败
*3
$8
renamenx
$1
a
$4
haha

# expire key seconds,限定 key 的生存时间(秒),超过该时间会自动删除
*3
$6
expire
$1
a
$2
10

# ttl key,查询 key 还有多长时间过期(秒)
*2
$3
ttl
$1
a

# flushdb,清空当前数据库中所有 key
*1
$7
flushdb

# flushall,清空所有数据库中所有 key
*1
$8
flushall

# info [section],查询 redis 相关信息
*1
$4
info

Search

    Table of Contents