Erlang服务程序和OTP基础#

  • 之前已经学过erlang的基本概念和操作。 http://xiongchen.cc/post/29

  • 这次学习实际的应用和OTP的基础。 api文档 https://erldocs.com/ https://www.erlang.org/docs

  • 环境 erlang版本用的最新的otp24.0。 ide用的是Intellij idea 2021.2社区版。 ide进设置,安装erlang插件。 创建project,选erlang。完了开始调代码。


基本的socket流程#

TCP服务器#

  • erlang的gen_tcp提供tcp socket的接口。 可用gen_tcp来实现完整的tcp服务程序。 gen代表generic。

src目录下新建erlang module,起名为tcp_app。

-module(tcp_app).
-author("xc").

%% API
-export([start_server/1, send_data/3]). % 暴露两个接口给外部调用,后面的数字表示参数个数。

loop(Socket) ->
    io:format("server loop ~n"),
    
    % 收消息
    receive
        % 匹配各种消息
        {tcp_closed, Socket} -> % socket关闭
            io:format("server recv tcp_closed ~n");
        {tcp, Socket, Data} -> % 正常的tcp数据
            io:format("server recv ~p~n", [Data]),

            % 发数据
            gen_tcp:send(Socket, Data),

            % active once模式。每次收到消息后要主动设置一下,才会收到后续消息。
            inet:setopts(Socket, [{active, once}]),

            % 继续loop
            loop(Socket)
    end.

start_accept(Listen) ->
    io:format("server start_accept ~n"),

    % 开始accept。一旦成功马上另起一个accept进程。自己继续处理连上来的socket。
    {ok, Socket} = gen_tcp:accept(Listen),
    spawn(fun() -> start_accept(Listen) end), 
    loop(Socket).

start_server(Port) -> % 启动服务器的入口
    io:format("server start_server ~n"),

    % 标准的socket流程。先listen。第二个参数是各种选项。binary表示数据包是以二进制数据展示。
    % {packet, N}是erlang处理tcp包分包的一个机制,
    % {active, once}。socket是active还是passive。可选true,false,once

    {ok, Listen} = gen_tcp:listen(Port, [binary, {packet, 0}, {active, once}]),

    % listen成功后开始start_accept
    spawn(fun() -> start_accept(Listen) end). 

send_data(Host, Port, Data) ->
    % 客户端连接服务器连接ip端口。
    {ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 0}]),

    % 发一条数据并接收
    ok = gen_tcp:send(Socket, Data),
    receive
        Any ->
            io:format("client recv ~p~n", [Any])
    end.

有两个重要选项

  • {packet,  0 | 1 | 2 | 4 | raw | sunrm | asn1 | cdr | fcgi | line | tpkt | http | httph | http_bin | httph_bin} | https://erldocs.com/current/kernel/inet.html#packet 定义数据包的格式。

    选0不处理数据。

    选1、2、4。数据包里会添加一个1或2或4字节的header,包含实际数据的长度。4字节可表示2G字节的数据包。

    实际的作用:对于tcp数据,可能一次收发不全一条完整的数据,或者多收部分数据。

    这样造成接收端分不清数据边界,不能直接解析数据。

    所以我们都要定一个数据包的格式协议,确定消息的边界,这样收发两端才能正确通信。

    这个选项其实就是一个最简单的协议,把每条完整数据的长度定死。

    这样buffer里每次累积到协定长度的数据,就认为是一个完整的数据包。这样loop里每次receive的数据就是完整的一条数据了,不用再费劲组包了。

    还包含http等其他内置协议。如果是自定义协议还是得自己实现。

  • {active, true | false | once | -32768..32767} | https://erldocs.com/current/kernel/inet.html#setopts/2

    socket是active还是passive。可选true,false,once。这是erlang控制数据包的获取的一个机制。

    如果是true,loop进程的控制进程不会进行控制,只要有数据就会推到loop里。

    如果是false,loop必须调gen_tcp:recv才能收到数据。

    如果是once,loop每次收到数据后必须调一次inet:setopts(Sock, [{active, once}])才能再次收到一个消息。 还可以设为一个数N,默认收N条数据。


测试程序#

同样src目录下新建erlang module,起名为app。

-module(app).
-author("xc").

%% API
-export([gg/0]).
-import(tcp_app, [start_server/1, send_data/1]).

gg()->
    io:format("ggwtf~n"),
    tcp_app:start_server(9999),
    tcp_app:send_data("127.0.0.1", 9999, "wtf1"),
    tcp_app:send_data("127.0.0.1", 9999, "wtf2"),
    tcp_app:send_data("127.0.0.1", 9999, "wtf3").

暴露的接口为gg。

run的配置里module and function里填上”app gg”。即运行app模块的gg函数。再run即可看到打印。

ide里build后在out/production/xxx/目录会生成所有代码的beam文件。

可以进这个文件夹运行erlang的命令行程序erl进行操作。类似python的idle shell。

进shell直接运行 tcp_app:start_server(9999). 等等。


UDP服务器#

erlang的gen_udp提供udp socket的接口。

可用gen_udp来实现完整的udp服务程序。

src目录下新建erlang module,起名为udp_app。

udp比tcp简单一些。

-module(udp_app).
-author("xc").

%% API
-export([start_server/0, send_data/1]).

start_server() ->
    spawn(fun() -> do_start_server(9999) end).

loop(Socket) ->
    receive
        {udp, Socket, Host, Port, Bin} = Msg ->
            io:format("server recv ~p~n", [Msg]),
            gen_udp:send(Socket, Host, Port, Bin),
            loop(Socket)
    end.

do_start_server(Port) ->
    {ok, Socket} = gen_udp:open(Port, [binary]),
    io:format("server opened socket ~p~n", [Socket]),
    loop(Socket).

send_data(Data) ->
    {ok, Socket} = gen_udp:open(0, [binary]),
    ok = gen_udp:send(Socket, "127.0.0.1", 9999, Data),

    receive
        {udp, Socket, Host, Port, D} ->
            io:format("client recv ~p ~p ~p ~n", [Host, Port, D]);
        Any ->
            io:format("client recv ~p~n", [Any])
    end.

OTP#

  • OTP全称Open Telecom Platform。这个名称会造成误解。

    其实它包括一系列的框架、库、工具,可用来搭建大型、可扩展、可动态升级、可容灾的高并发分布式通信系统。

  • OTP的核心思想是所谓的OTP behavior,其实就是抽象的框架。

    有些常用的程序架构比如C/S、状态机、事件、监控等等。把他们做成程序框架,就可以方便地复用了。

  • 下面要学习最常用的gen_server。

    为更好地理解otp,从最简单的抽象开始自己写代码,一点点加功能,最后接近系统自带的gen_server。

server1#

-module(server1).
-author("xc").

%% API
-export([start/2, rpc/2]).

loop(Name, Mod, State) ->
    io:format("loop~n"),

    % 收消息
    receive
        {From, Request} ->
            % 调用应用模块的handle函数
            {Response, State1} = Mod:handle(Request, State),

            % 发送结果
            From ! {Name, Response},

            % 继续loop
            loop(Name, Mod, State1)
    end.

start(Name, Mod) ->
    % 把名字和进程绑定,后续就能直接给这个名字发消息。
    register(Name, spawn(fun() -> loop(Name, Mod, Mod:init()) end)).

rpc(Name, Request) ->
    io:format("recv rpc~n"),

    % 给loop进程发消息
    Name ! {self(), Request},

    % 收回复
    receive
        {Name, Response} ->
            Response
    end.
  • server1大致是接收一个应用模块(Mod),Mod有一个handle函数可处理实际业务。

    起进程不断收消息。消息的来源是rpc。

    收到消息时调用应用模块的handle函数,把结果返给rpc,rpc再返给调用者。

  • 总结一下,思想就是实现一个通用的框架,接收一个应用模块,规定好应用模块handle的参数和返回格式。

    应用模块只管实现自己的业务。server1负责实现消息的传递、rpc和进程管理。

    这也是otp的主要思想,实现通用的底层设施,提供给应用模块。应用模块专心处理业务就好。

-module(name_server).
-author("xc").

%% API
-export([]).
-import(server1, [rpc/2]).
-export([init/0, add/2, find/1, handle/2, start/0]).

start() ->
    % 起服务
    server1:start(name_server, name_server).

add(Name, Place) ->
    % 发rpc
    Result = rpc(name_server, {add, Name, Place}),
    io:format("add result ~p~n", [Result]).

find(Name) ->
    % 发rpc
    Result = rpc(name_server, {find, Name}),
    io:format("find result ~p~n", [Result]).

init() ->
    % 业务初始化
    dict:new().

% 实现业务
handle({add, Name, Place}, Dict) ->
    io:format("handle1~n"),
    {ok, dict:store(Name, Place, Dict)};
handle({find, Name}, Dict) ->
    io:format("handle2~n"),
    {dict:find(Name, Dict), Dict}.
  • 应用模块按规定实现handle。接收两个参数,Request和State,处理Request,返回新的State。

    这里State是一个dict,某种意义上也可以说是一种状态。

  • 这里实现的是一个kv存储。

    应用模块暴露给最终用户的api基本都是业务相关。直接发rpc,带上Request。

    name_server:start(),
    name_server:add(gg, "wtf"),
    name_server:find(wtf),
    name_server:find(gg).

server2#

  • server1没有错误处理。如果业务模块出了异常,整个server完蛋。

    在name_server的handle里加一个throw(“wtfffff”)即可复现。

  • server2添加了错误处理。对业务模块的handle进行catch。

    如果出了异常,可以报告给业务模块。并且保持老的State,继续loop。

-module(server2).
-author("xc").

%% API
-export([start/2, rpc/2]).

loop(Name, Mod, OldState) ->
    io:format("loop~n"),

    % 收消息
    receive
        {From, Request} ->
            % catch应用模块的handle函数
            try Mod:handle(Request, OldState) of
                {Response, NewState} ->
                    % 发送结果
                    From ! {Name, ok, Response},

                    % 继续loop
                    loop(Name, Mod, NewState)
            catch
                _:Why ->
                    io:format("Server ~p request ~p ~ncaused exception ~p~n", [Name, Request, Why]),
                    % 报告crash
                    From ! {Name, crash},
                    % 继续loop
                    loop(Name, Mod, OldState)
            end
    end.

start(Name, Mod) ->
    % 把名字和进程绑定,后续就能直接给这个名字发消息。
    register(Name, spawn(fun() -> loop(Name, Mod, Mod:init()) end)).

rpc(Name, Request) ->
    io:format("recv rpc~n"),

    % 给loop进程发消息
    Name ! {self(), Request},

    % 收回复
    receive
        {Name, crash} -> crash; %exit(rpc);
        {Name, ok, Response} -> Response
    end.

server3#

  • 当前的name_server只有add和find两个业务功能。

    如果要加新功能,通常需要更新name_server的代码,重启服务。

    erlang可以实现热更新代码,不用重启服务。原理就是把传给server框架的业务模块直接实时替换。

先做一个新的name_server_v2。新加delete和all_names功能。

-module(name_server_v2).
-author("xc").

%% API
-export([]).
-import(server2, [start/2, rpc/2]).
-export([init/0, add/2, find/1, handle/2, start/0, delete/1, all_names/0]).

start() ->
    % 起服务
    server2:start(name_server, name_server_v2).

add(Name, Place) ->
    % 发rpc
    Result = rpc(name_server, {add, Name, Place}),
    io:format("add result ~p~n", [Result]).

find(Name) ->
    % 发rpc
    Result = rpc(name_server, {find, Name}),
    io:format("find result ~p~n", [Result]).

delete(Name) ->
    % 发rpc
    Result = rpc(name_server, {delete, Name}),
    io:format("delete result ~p~n", [Result]).

all_names() ->
    % 发rpc
    Result = rpc(name_server, {all_names}),
    io:format("all_names result ~p~n", [Result]).

init() ->
    % 业务初始化
    dict:new().

% 实现业务
handle({add, Name, Place}, Dict) ->
    throw("wtfffff"),
    io:format("handle1~p~n", [Name]),
    {ok, dict:store(Name, Place, Dict)};
handle({find, Name}, Dict) ->
    io:format("handle2~n"),
    {dict:find(Name, Dict), Dict};
handle({delete, Name}, Dict) ->
    io:format("handle3~p~n", [Name]),
    {ok, dict:erase(Name, Dict)};
handle({all_names}, Dict) ->
    io:format("handle4~n"),
    {dict:fetch_keys(Dict), Dict}.
  • 在server2基础上加一个简单的逻辑就能实现代码更新。 做一个新的函数swap_code接收新的模组,发rpc。 loop里匹配到swap_code时用新的模组起loop即可。

-module(server3).
-author("xc").

%% API
-export([start/2, rpc/2, swap_code/2]).

loop(Name, Mod, OldState) ->
    io:format("loop~n"),

    % 收消息
    receive
        {From, {swap_code, NewMod}} -> % 匹配swap_code
            From ! {Name, ok, ack},

            io:format("swap_code~n"),
            % 用新的Mod起loop实现代码更新。
            loop(Name, NewMod, OldState);
        {From, Request} ->
            % catch应用模块的handle函数
            try Mod:handle(Request, OldState) of
                {Response, NewState} ->
                    % 发送结果
                    From ! {Name, ok, Response},

                    % 继续loop
                    loop(Name, Mod, NewState)
            catch
                _:Why ->
                    io:format("Server ~p request ~p ~ncaused exception ~p~n", [Name, Request, Why]),
                    % 报告crash
                    From ! {Name, crash},
                    % 继续loop
                    loop(Name, Mod, OldState)
            end
    end.

start(Name, Mod) ->
    % 把名字和进程绑定,后续就能直接给这个名字发消息。
    register(Name, spawn(fun() -> loop(Name, Mod, Mod:init()) end)).

rpc(Name, Request) ->
    io:format("recv rpc~n"),

    % 给loop进程发消息
    Name ! {self(), Request},

    % 收回复
    receive
        {Name, crash} -> crash; %exit(rpc);
        {Name, ok, Response} -> Response
    end.

swap_code(Name, Mod) ->
    rpc(Name, {swap_code, Mod}).
  • 注意erlang里面有各种模式匹配。比如rpc函数收回复,如果发的消息匹配不了,rpc就会卡在那返回不了。

  • build项目后在erlang的Eshell里测试。

    % 先加载原始的name_server。
    server3:start(name_server, name_server).
    % 可用find和add。
    name_server:add(ggg, "wtf").
    % 如果调用delete可以看到报错,找不到函数。
    name_server:delete(ggg).
    % 这时更新代码。
    server3:swap_code(name_server, name_server_v2).
    % 之后可以用name_server_v2里新功能了。
    name_server_v2:all_names().
    name_server_v2:delete(ggg).
  • 这里面还有一些列问题 比如name_server和name_server_v2,调用的模组名字发生了改变。 而且发rpc的第一个参数都是用的name_server,造成老的name_server模组仍可使用。 暂且不管这些。

  • 到此已经了解了gen_server框架的思想。下面看看如何使用正式的gen_server。


OTP之gen_server#

  • ide里新建erlang文件,可选很多种类型。其中有OTP gen_server和OPT gen_server, minimal。

    这是erlang插件提供的代码模板,各创建一个对比一下。

  • 看minimal模板,可以看到样子和之前做的name_server差不多,都是做一些handle实现业务功能,还有启动、关闭、更新代码等辅助。

    非minimal的模板包含了较完整的信息,包括各种函数和参数的类型定义。

    minimal模板应该是给老手使用的。

  • 下面要实现一个简单的银行系统。为简洁起见在minimal模板上写代码,完整模板作为参考。

    银行系统需要存储数据,我们用erlang的ets(erlang term storage)。

    ets是erlang自带的跑在内存上的存储系统,简单强大,可存多种类型数据。

  1. 创建OPT gen_server, minimal,起名my_bank。

  2. 设计功能,定义接口。

    start() 启动银行

    stop() 关闭银行

    new_account(Who) 创建账户

    deposit(Who, Amount) 存钱

    withdraw(Who, Amount) 取钱

  3. 接口里定好rpc请求的格式,用 gen_server:call发给gen_server。

  4. handle_call里根据定好的rpc请求,实现各个业务。

-module(my_bank).

-behaviour(gen_server).

-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
    code_change/3]).

-define(SERVER, ?MODULE).

%-record(my_bank_state, {}).

-export([start/0, stop/0, new_account/1, deposit/2, withdraw/2]).

%%%===================================================================
%%% Spawning and gen_server implementation
%%%===================================================================

start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

% 业务初始化。创建ets。
init([]) -> {ok, ets:new(?MODULE, [])}.

% 实现所有业务
handle_call(stop, _From, Tab) ->
    {stop, normal, stopped, Tab};
handle_call({new, Who}, _From, Tab) ->
    % 统一先找用户。
    Reply = case ets:lookup(Tab, Who) of
                [] -> % 如果不存在。insert{名字, 初始金额}
                    ets:insert(Tab, {Who, 0}),
                    {welcome, Who};
                [_] -> % 如果存在
                    {Who, you_exists}
            end,
    {reply, Reply, Tab};
handle_call({add, Who, Amount}, _From, Tab) ->
    Reply = case ets:lookup(Tab, Who) of
                [] ->
                    {Who, no_account};
                [{Who, Balance}] ->
                    % 更新为新金额
                    NewB = Balance + Amount,
                    ets:insert(Tab, {Who, NewB}),
                    {ok}
            end,
    {reply, Reply, Tab};
handle_call({remove, Who, Amount}, _From, Tab) ->
    Reply = case ets:lookup(Tab, Who) of
                [] ->
                    {Who, no_account};
                [{Who, Balance}] when Amount =< Balance ->
                    % 更新为新金额
                    NewB = Balance - Amount,
                    ets:insert(Tab, {Who, NewB}),
                    {ok};
                [{Who, Balance}] ->
                    % 金额不足
                    {no_enough_mineral, Balance}
            end,
    {reply, Reply, Tab}.

handle_cast(_Request, State) ->
    {noreply, State}.

handle_info(_Info, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.


% 启动银行
start() -> start_link().

% 关闭银行
stop() -> gen_server:call(?MODULE, stop).

% 创建账户
new_account(Who) -> gen_server:call(?MODULE, {new, Who}).

% 存钱
deposit(Who, Amount) -> gen_server:call(?MODULE, {add, Who, Amount}).

% 取钱
withdraw(Who, Amount) -> gen_server:call(?MODULE, {remove, Who, Amount}).

在Eshell里进行各种测试

my_bank:start().
my_bank:withdraw("xc", 100).
my_bank:new_account("xc").
my_bank:new_account("xc").
my_bank:withdraw("xc", 100).
my_bank:deposit("xc", 111).
my_bank:withdraw("xc", 100).
my_bank:stop().

有个问题,如果调一个不存在的函数,会造成服务死掉。必须重新start。不知道原因,后续再研究。


总结#

  • 学习了erlang的socket流程。

  • 学习了erlang OTP的基本思想。

  • 熟悉了gen_server的基本使用。