frp 是一款专注于内网穿透的、高性能的反向代理工具,做 Web 安全以及网络开发时常用。就算你没接触过这些领域,室友开黑打游戏时,也可能会使用 Sakura FRP 做内网穿透实现局域网联机。
v0.1.0 版本 v0.1.0 版本的代码量很小,核心功能代码就 1079 行,项目结构也很简单,适合上手。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 PROJECT: └─frp ├─conf ├─Godeps └─src └─frp ├─cmd │ ├─frpc │ └─frps ├─models │ ├─client | 客户端配置信息 & │ ├─consts │ ├─msg | 定义请求/响应信息的结构体 │ └─server | 服务端配置信息 & 处理代理服务器的函数 └─utils ├─broadcast | ├─conn | 设置本地监听器 & 获取/发起连接的函数 ├─log | 日志处理 └─pcrypto
在开始之前,先了解 Godeps,一个旧时代的包依赖管理工具,原理是扫描记录版本控制的信息,并在 go 命令前加壳以实现依赖管理。不过后来逐渐被大家更为熟知的 go mod
取代了,其 GitHub 仓库也被 archive 了。
项目中使用该工具管理 Go 依赖库时,会在项目中生成 /Godeps/Godeps.json
配置文件。这里就不在解释了,自己看看就可以:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 { "ImportPath" : "frp" , "GoVersion" : "go1.4" , "Packages" : [ "./..." ], "Deps" : [ { "ImportPath" : "github.com/astaxie/beego/logs" , "Comment" : "v1.5.0-9-gfb7314f" , "Rev" : "fb7314f8ac86b83ccd34386518d97cf2363e2ae5" }, { "ImportPath" : "github.com/vaughan0/go-ini" , "Rev" : "a98ad7ee00ec53921f08832bc06ecf7fd600e6a1" } ] }
frp 分客户端(frpc)和服务端(frps),使用时是先建立服务端再建立客户端,就按照这个顺序。
frps 对应的配置文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 [common] bind_addr = 0.0 .0.0 bind_port = 7000 log_file = ./frps.loglog_level = debuglog_way = console [test1] passwd = 123 bind_addr = 0.0 .0.0 listen_port = 6000
先看项目入口文件,位置是 src/frp/cmd/frps/main.go
:
1 2 3 4 5 6 7 8 9 10 11 12 func main () { err := server.LoadConf("./frps.ini" ) log.InitLog(server.LogWay, server.LogFile, server.LogLevel) l, err := conn.Listen(server.BindAddr, server.BindPort) log.Info("Start frps success" ) ProcessControlConn(l) }
调用 src/frp/models/server/config.go
中的 LoadConf
方法加载配置文件;初始化日志记录,使用 server 包中定义的日志记录方式、日志文件和日志级别;建立连接;在前面无误的情况下打印 success 信息;最后调用 ProcessControlConn
函数,处理控制连接。
然后看 src/frp/models/server/config.go
。
这个文件的功能就是使用 github.com/vaughan0/go-ini
库将配置文件信息映射为结构体,方便后续使用。配置文件可分为两部分,[common]
通用信息和代理服务器信息列表,其中 [common]
如果没在配置文件中指定,就是用该文件开头指定的默认值。其他配置信息都被当做代理服务器信息循环读取,文件开头初始化了一个 ProxyServers
映射,用于存储代理服务器的配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 var ProxyServers map [string ]*ProxyServer = make (map [string ]*ProxyServer)func LoadConf (confFile string ) (err error ) { for name, section := range conf { if name != "common" { proxyServer := &ProxyServer{} proxyServer.Name = name proxyServer.Passwd, ok = section["passwd" ] if !ok { return fmt.Errorf("Parse ini file error: proxy [%s] no passwd found" , proxyServer.Name) } proxyServer.BindAddr, ok = section["bind_addr" ] if !ok { proxyServer.BindAddr = "0.0.0.0" } portStr, ok := section["listen_port" ] if ok { proxyServer.ListenPort, err = strconv.ParseInt(portStr, 10 , 64 ) if err != nil { return fmt.Errorf("Parse ini file error: proxy [%s] listen_port error" , proxyServer.Name) } } else { return fmt.Errorf("Parse ini file error: proxy [%s] listen_port not found" , proxyServer.Name) } proxyServer.Init() ProxyServers[proxyServer.Name] = proxyServer } } if len (ProxyServers) == 0 { return fmt.Errorf("Parse ini file error: no proxy config found" ) } }
日志部分使用的是 github.com/astaxie/beego/logs
三方库。可以不细看,日志部分所需的配置信息由上面所说的配置文件指定的。
再看 src/frp/utils/conn/conn.go
,这部分功能是实现服务端与客户端的连接。
文件开头定义了一个 Listener
结构体,用于存储 TCP 监听器的地址、监听器对象、连接通道和关闭标志。
1 2 3 4 5 6 type Listener struct { addr net.Addr l *net.TCPListener conns chan *Conn closeFlag bool }
Listen
函数用于在指定端口启动一个 TCP 监听器,接受绑定的地址和端口号作为参数(配置文件中指定),并返回一个 Listener
对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 func Listen (bindAddr string , bindPort int64 ) (l *Listener, err error ) { tcpAddr, err := net.ResolveTCPAddr("tcp4" , fmt.Sprintf("%s:%d" , bindAddr, bindPort)) if err != nil { return l, err } listener, err := net.ListenTCP("tcp" , tcpAddr) if err != nil { return l, err } l = &Listener{ addr: listener.Addr(), l: listener, conns: make (chan *Conn), closeFlag: false , } go func () { for { conn, err := l.l.AcceptTCP() if err != nil { if l.closeFlag { return } continue } c := &Conn{ TcpConn: conn, closeFlag: false , } c.Reader = bufio.NewReader(c.TcpConn) l.conns <- c } }() return l, err }
返回的 Listener
是一个准备好接受连接的结构体。这个结构体包含了一个通道(conns
),用于接收和存储新的连接。在 Listen
函数内部,有一个协程不断地接受新的连接并将它们放入这个通道。
我们的目光再返回主函数。主函数建立好上文说的这个 l
后,就认为 frp 已经成功启动了,然后将这个 l
传递给控制模块进行管理,控制模块主体在 src/frp/cmd/frps/control.go
中。
ProcessControlConn
函数启动一个无限循环,调用 l.GetConn()
方法尝试从监听器获取一个新的连接对象 c
:如果成功获取到新的连接,它将为每个新的连接启动一个新的协程 controlWorker(c)
,这个协程将独立处理每个连接的控制逻辑;如果 l.GetConn()
返回错误,函数将停止运行。
这里的连接对象 c
在项目的 utils/conn
中定义:
1 2 3 4 5 type Conn struct { TcpConn *net.TCPConn Reader *bufio.Reader closeFlag bool }
Conn
结构体是一个自定义的数据类型,用于封装 TCP 连接及其相关操作:
TcpConn
:指向 net.TCPConn
的指针,它代表了底层的 TCP 连接。通过这个字段,可以访问和控制 TCP 网络连接的各种属性和方法,如发送和接收数据。
Reader
:指向 bufio.Reader
的指针,它提供了一个缓冲区,可以更高效地读取数据。这个读取器封装了 TcpConn
,使得可以方便地进行如按行读取文本等操作。
closeFlag
:布尔值,用于指示连接是否已经被关闭。如果为 true
,则表示连接已经关闭,不应再进行读写操作。
然后再看用于处理连接的函数 controlWorker(c)
,其中有两个较为关键的函数,均在注释中标注了 ★
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 func controlWorker (c *conn.Conn) { res, err := c.ReadLine() if err != nil { log.Warn("Read error, %v" , err) return } log.Debug("get: %s" , res) clientCtlReq := &msg.ClientCtlReq{} clientCtlRes := &msg.ClientCtlRes{} if err := json.Unmarshal([]byte (res), &clientCtlReq); err != nil { log.Warn("Parse err: %v : %s" , err, res) return } succ, info, needRes := checkProxy(clientCtlReq, c) if !succ { clientCtlRes.Code = 1 clientCtlRes.Msg = info } if needRes { defer c.Close() buf, _ := json.Marshal(clientCtlRes) err = c.Write(string (buf) + "\\n" ) if err != nil { log.Warn("Write error, %v" , err) time.Sleep(1 * time.Second) return } } else { return } s, ok := server.ProxyServers[clientCtlReq.ProxyName] if !ok { log.Warn("ProxyName [%s] is not exist" , clientCtlReq.ProxyName) return } go readControlMsgFromClient(s, c) serverCtlReq := &msg.ClientCtlReq{} serverCtlReq.Type = consts.WorkConn for { closeFlag := s.WaitUserConn() if closeFlag { log.Debug("ProxyName [%s], goroutine for dealing user conn is closed" , s.Name) break } buf, _ := json.Marshal(serverCtlReq) err = c.Write(string (buf) + "\\n" ) if err != nil { log.Warn("ProxyName [%s], write to client error, proxy exit" , s.Name) s.Close() return } log.Debug("ProxyName [%s], write to client to add work conn success" , s.Name) } log.Info("ProxyName [%s], I'm dead!" , s.Name) return }
checkProxy
函数首先检验代理是否合法,然后通过检查 req.Type
的值来区分控制连接和工作连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 func checkProxy (req *msg.ClientCtlReq, c *conn.Conn) (succ bool , info string , needRes bool ) { succ = false needRes = true s, ok := server.ProxyServers[req.ProxyName] if !ok { info = fmt.Sprintf("ProxyName [%s] is not exist" , req.ProxyName) log.Warn(info) return } if req.Passwd != s.Passwd { info = fmt.Sprintf("ProxyName [%s], password is not correct" , req.ProxyName) log.Warn(info) return } if req.Type == consts.CtlConn { if s.Status != consts.Idle { info = fmt.Sprintf("ProxyName [%s], already in use" , req.ProxyName) log.Warn(info) return } err := s.Start() if err != nil { info = fmt.Sprintf("ProxyName [%s], start proxy error: %v" , req.ProxyName, err.Error()) log.Warn(info) return } log.Info("ProxyName [%s], start proxy success" , req.ProxyName) } else if req.Type == consts.WorkConn { needRes = false if s.Status != consts.Working { log.Warn("ProxyName [%s], is not working when it gets one new work conn" , req.ProxyName) return } s.GetNewCliConn(c) } else { info = fmt.Sprintf("ProxyName [%s], type [%d] unsupport" , req.ProxyName, req.Type) log.Warn(info) return } succ = true return }
其中控制连接主要用于设置和维护工作连接,以及进行心跳检测来确保客户端仍然活跃。
控制连接是客户端与服务器之间的初始连接,用于管理和控制代理的状态。在控制连接中,客户端会发送一个包含代理名称和密码的请求,服务器会验证这些信息,如果验证成功,服务器就会根据请求的类型来启动代理或处理其他控制消息。
而工作连接是在控制连接建立后由客户端发起的,用于实际的数据传输。工作连接允许用户数据通过代理服务器转发,实现客户端和服务端之间的通信。
readControlMsgFromClient
函数用于读取客户端发送的控制信息 ,它接受一个代理服务器和连接作为参数,用于读取客户端发送的控制消息,包括对心跳信息的处理,用于保持客户端和服务端的连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 func readControlMsgFromClient (s *server.ProxyServer, c *conn.Conn) { isContinueRead := true f := func () { isContinueRead = false s.Close() log.Error("ProxyName [%s], client heartbeat timeout" , s.Name) } timer := time.AfterFunc(time.Duration(server.HeartBeatTimeout)*time.Second, f) defer timer.Stop() for isContinueRead { content, err := c.ReadLine() if err != nil { if err == io.EOF { log.Warn("ProxyName [%s], client is dead!" , s.Name) s.Close() break } else if nil == c || c.IsClosed() { log.Warn("ProxyName [%s], client connection is closed" , s.Name) break } log.Error("ProxyName [%s], read error: %v" , s.Name, err) continue } clientCtlReq := &msg.ClientCtlReq{} if err := json.Unmarshal([]byte (content), clientCtlReq); err != nil { log.Warn("Parse err: %v : %s" , err, content) continue } if consts.CSHeartBeatReq == clientCtlReq.Type { log.Debug("ProxyName [%s], get heartbeat" , s.Name) timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second) clientCtlRes := &msg.ClientCtlRes{} clientCtlRes.GeneralRes.Code = consts.SCHeartBeatRes response, err := json.Marshal(clientCtlRes) if err != nil { log.Warn("Serialize ClientCtlRes err! err: %v" , err) continue } err = c.Write(string (response) + "\\n" ) if err != nil { log.Error("Send heartbeat response to client failed! Err:%v" , err) continue } } } }
frpc ini
配置文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 [common] server_addr = 127.0 .0.1 server_port = 7000 log_file = ./frpc.loglog_level = debuglog_way = console[test1] passwd = 123 local_port = 22
同样是入口文件开始,位置是 src/frp/cmd/frpc/main.go
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func main () { err := client.LoadConf("./frpc.ini" ) if err != nil { os.Exit(-1 ) } log.InitLog(client.LogWay, client.LogFile, client.LogLevel) var wait sync.WaitGroup wait.Add(len (client.ProxyClients)) for _, client := range client.ProxyClients { go ControlProcess(client, &wait) } log.Info("Start frpc success" ) wait.Wait() log.Warn("All proxy exit!" ) }
加载配置文件和日志初始化都讲过了,不再赘述。将目光移至 src/frp/cmd/frpc/control.go
,看看用于处理客户端的 ControlProcess()
。其中关键函数是用于连接服务端的 loginToServer
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 func ControlProcess (cli *client.ProxyClient, wait *sync.WaitGroup) { defer wait.Done() c, err := loginToServer(cli) if err != nil { log.Error("ProxyName [%s], connect to server failed!" , cli.Name) return } connection = c defer connection.Close() for { content, err := connection.ReadLine() if err == io.EOF || nil == connection || connection.IsClosed() { log.Debug("ProxyName [%s], server close this control conn" , cli.Name) var sleepTime time.Duration = 1 for { log.Debug("ProxyName [%s], try to reconnect to server[%s:%d]..." , cli.Name, client.ServerAddr, client.ServerPort) tmpConn, err := loginToServer(cli) if err == nil { connection.Close() connection = tmpConn break } if sleepTime < 60 { sleepTime = sleepTime * 2 } time.Sleep(sleepTime * time.Second) } continue } else if err != nil { log.Warn("ProxyName [%s], read from server error, %v" , cli.Name, err) continue } clientCtlRes := &msg.ClientCtlRes{} if err := json.Unmarshal([]byte (content), clientCtlRes); err != nil { log.Warn("Parse err: %v : %s" , err, content) continue } if consts.SCHeartBeatRes == clientCtlRes.GeneralRes.Code { if heartBeatTimer != nil { log.Debug("Client rcv heartbeat response" ) heartBeatTimer.Reset(time.Duration(client.HeartBeatTimeout) * time.Second) } else { log.Error("heartBeatTimer is nil" ) } continue } cli.StartTunnel(client.ServerAddr, client.ServerPort) } }
接下来是 loginToServer
函数,核心函数有两个,分别是用于连接到指定地址端口的 ConnectServer
和用于启动心跳检测的 startHeartBeat
函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 func loginToServer (cli *client.ProxyClient) (c *conn.Conn, err error ) { c, err = conn.ConnectServer(client.ServerAddr, client.ServerPort) if err != nil { log.Error("ProxyName [%s], connect to server [%s:%d] error, %v" , cli.Name, client.ServerAddr, client.ServerPort, err) return } req := &msg.ClientCtlReq{ Type: consts.CtlConn, ProxyName: cli.Name, Passwd: cli.Passwd, } buf, _ := json.Marshal(req) err = c.Write(string (buf) + "\n" ) if err != nil { log.Error("ProxyName [%s], write to server error, %v" , cli.Name, err) return } res, err := c.ReadLine() if err != nil { log.Error("ProxyName [%s], read from server error, %v" , cli.Name, err) return } log.Debug("ProxyName [%s], read [%s]" , cli.Name, res) clientCtlRes := &msg.ClientCtlRes{} if err = json.Unmarshal([]byte (res), &clientCtlRes); err != nil { log.Error("ProxyName [%s], format server response error, %v" , cli.Name, err) return } if clientCtlRes.Code != 0 { log.Error("ProxyName [%s], start proxy error, %s" , cli.Name, clientCtlRes.Msg) return c, fmt.Errorf("%s" , clientCtlRes.Msg) } go startHeartBeat(c) log.Debug("ProxyName [%s], connect to server[%s:%d] success!" , cli.Name, client.ServerAddr, client.ServerPort) return }
先看实现连接的ConnectServer
函数,位于 utils/conn
。这个函数实际上是对普通 TCP 连接的封装:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 type Conn struct { TcpConn *net.TCPConn Reader *bufio.Reader closeFlag bool } func ConnectServer (host string , port int64 ) (c *Conn, err error ) { c = &Conn{} servertAddr, err := net.ResolveTCPAddr("tcp4" , fmt.Sprintf("%s:%d" , host, port)) if err != nil { return } conn, err := net.DialTCP("tcp" , nil , servertAddr) if err != nil { return } c.TcpConn = conn c.Reader = bufio.NewReader(c.TcpConn) c.closeFlag = false return c, nil }
然后看 startHeartBeat
函数,该函数用于发起心跳包维持客户端与服务端的连接:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 func startHeartBeat (c *conn.Conn) { f := func () { log.Error("HeartBeat timeout!" ) if c != nil { c.Close() } } heartBeatTimer = time.AfterFunc(time.Duration(client.HeartBeatTimeout)*time.Second, f) defer heartBeatTimer.Stop() clientCtlReq := &msg.ClientCtlReq{ Type: consts.CSHeartBeatReq, ProxyName: "" , Passwd: "" , } request, err := json.Marshal(clientCtlReq) if err != nil { log.Warn("Serialize clientCtlReq err! Err: %v" , err) } log.Debug("Start to send heartbeat" ) for { time.Sleep(time.Duration(client.HeartBeatInterval) * time.Second) if c != nil && !c.IsClosed() { err = c.Write(string (request) + "\n" ) if err != nil { log.Error("Send hearbeat to server failed! Err:%v" , err) continue } } else { break } } log.Debug("Heartbeat exit" ) }
现在,我们实现了客户端与服务端的连接,并在连接后依靠心跳机制实现了维持通信,下一步就是建立隧道,实现通信功能。视角转到 utils/conn
的 StartTunnel
函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (p *ProxyClient) StartTunnel(serverAddr string , serverPort int64 ) (err error ) { localConn, err := p.GetLocalConn() if err != nil { return } remoteConn, err := p.GetRemoteConn(serverAddr, serverPort) if err != nil { return } log.Debug("Join two conns, (l[%s] r[%s]) (l[%s] r[%s])" , localConn.GetLocalAddr(), localConn.GetRemoteAddr(), remoteConn.GetLocalAddr(), remoteConn.GetRemoteAddr()) go conn.Join(localConn, remoteConn) return nil }
建立到本地服务的连接没什么好说的,获取配置文件中的本地端口做个调用即可。着重看下 GetRemoteConn
函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func (p *ProxyClient) GetRemoteConn(addr string , port int64 ) (c *conn.Conn, err error ) { defer func () { if err != nil { c.Close() } }() c, err = conn.ConnectServer(addr, port) if err != nil { log.Error("ProxyName [%s], connect to server [%s:%d] error, %v" , p.Name, addr, port, err) return } req := &msg.ClientCtlReq{ Type: consts.WorkConn, ProxyName: p.Name, Passwd: p.Passwd, } buf, _ := json.Marshal(req) err = c.Write(string (buf) + "\n" ) if err != nil { log.Error("ProxyName [%s], write to server error, %v" , p.Name, err) return } err = nil return }
最后我们再转到 Join
函数,这个函数是搭建隧道的关键,本质上就是实现两个连接之间的双向数据传输:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func Join (c1 *Conn, c2 *Conn) { var wait sync.WaitGroup pipe := func (to *Conn, from *Conn) { defer to.Close() defer from.Close() defer wait.Done() var err error _, err = io.Copy(to.TcpConn, from.TcpConn) if err != nil { log.Warn("join conns error, %v" , err) } } wait.Add(2 ) go pipe(c1, c2) go pipe(c2, c1) wait.Wait() return }
后话 Ok,至此,frp v0.1.0 的源码就分析的差不多了,虽说有点老(接近 10 年前),但对理解内网穿透的实现原理也是很有帮助的。之后的版本中虽然做了很多改进,但整体实现思路其实没有太大变化。
虽说不是第一次试着去阅读某个工具的源码,但边学边写博文还是第一次,可能略有混乱,但感觉也还好?唯一的问题是不适合把一段代码拆开,以 frpc 的 controlWorker
举例,里面先后使用了两个重要函数,一般这种情况下,我会按照代码执行逻辑先跳转到对应函数看看这个函数做了什么,看完之后再。但写文章的时候拆开整个函数,先贴前半部分,中间再塞调用的函数,然后再贴后半段函数,就感觉很怪,所以在这篇文章里,我就直接先介绍整个函数,标注其中调用的重要函数,然后在之后单独介绍。
然后通篇写下来,想要吐槽的是 Golang 的错误处理机制,不论是阅读的时候还是写文章的时候都感到略有麻烦。就是那种,大家都知道按照 Golang 的风格会在操作跟一个错误处理,控制日志模块输出些信息,多数情况下对核心功能的实现没什么帮助,但你又不好省略,因为就多出两三行,贴代码的时候用 // ... ...
代替效果也有限,但直接省去不写也感觉很不对。就比较难受。
别的都还好。
我能找到的还有一篇 FRP v0.5.0 源码阅读的文章,感兴趣的可以跳过去看看,也挺不错的:
https://www.joxrays.com/frp-source-code/