Erlang OTP实践


上次学习了erlang OTP的基础知识。 这次用OTP做一些实验,了解OPT中常用的库,掌握OTP的实际用法。


gen_server

消息的传递

  • gen_server:call 同步发消息。会阻塞。handle_call来处理。

  • gen_server:cast 异步发消息。立即返回。handle_cast来处理。

  • 也可以自己发消息。handle_info来处理。

  • 未被处理的消息

    erlang进程从自己的消息邮箱中取消息,默认情况如果消息不符合代码的匹配规则,会造成消息永远留在邮箱中。最终资源耗尽。

    按OTP的设计,会抛出function_clause的异常。

    应当把消息不匹配的情况作为bug,不应该出现。

  • handle_info

    如果不用gen_server自带的call和cast等给gen_server发消息,会走handle_info回调。

    比如

    {ok, PPP} = gen_server:start_link({local, ggg}, ggg, [], [])

    PPP ! wtf

    这样在handle_info里可以收到wtf。

同步

  • 一个同步场景 server收到两个客户端消息,默认是处理完一个就发回复。 现在需要等两个消息都处理完才发。 解决办法是先返回{noreply, State},等时机成熟再用gen_server:reply(From, Reply)显式发消息。

  • 也可以收到消息时用gen_server:reply(From, ok),直接返回ok,再自己处理消息。

终止

init, handle_call, handle_cast, handle_info都可以返回{stop, Reason ….}来终止程序。

发stop后会走terminate(Reason, State)回调,做善后工作。

timeout

  • gen_server:call可以传一个timeout参数,指定最多阻塞等待的时间,默认5秒。 超时的话会抛出timeout异常。

  • 起一个gen_server模组。起名timeout。 handle_call实现{sleep, Ms}请求。sleep Ms毫秒。

handle_call({sleep, Ms}, _From, State = #timeout_state{}) ->
    timer:sleep(Ms),
    {reply, ok, State}.

进行一系列测试。6个命令。

$ erl
Eshell V12.0  (abort with ^G)
1> gen_server:start_link({local, timeout}, timeout, [], []).
{ok,<0.80.0>}
2> gen_server:call(timeout, {sleep, 1000}).
ok
3> catch gen_server:call(timeout, {sleep, 5001}).
{'EXIT',{timeout,{gen_server,call,[timeout,{sleep,5001}]}}}
4> flush().
ok
5> gen_server:call(timeout, {sleep, 5001}).
** exception exit: {timeout,{gen_server,call,[timeout,{sleep,5001}]}}
     in function  gen_server:call/2 (gen_server.erl, line 239)
6> catch gen_server:call(timeout, {sleep, 1000}).
{'EXIT',{noproc,{gen_server,call,[timeout,{sleep,1000}]}}}
7>
  • 命令1启动server。

  • 命令2请求sleep1秒。默认5秒才超时,所以不会超时,返回ok。

  • 命令3请求sleep5001毫秒,会超时抛出异常,catch住。

  • 不能把timeout当作call的结果,因为server因为繁忙产生timeout,后续还是会返回正常的结果。

  • 迟到的结果必须得到处理,不然消息会堆积在进程的邮箱里,最后内存爆炸。

  • 命令4 flush()可以清空邮箱。

  • 命令5不作catch。进程会崩溃。

  • 命令6再次调用时,由于之前timeout模组的gen_server已经崩溃,所以报noproc找不到进程。

死锁

如果server1阻塞call server2的某个函数,最后通过某种路径又阻塞call了server1的起始函数。就会死锁。 默认会造成超时,进程崩溃。erlang进程有一些列监控和关联机制,可能发生连锁反应。


sys库

https://erldocs.com/current/stdlib/sys.html

sys提供各种系统级的辅助api。

  • trace 打开关闭trace。

  • log 打开、关闭、获取、打印log。

  • statistics 各种统计信息

  • get_status 获取进程状态

  • suspend 挂起进程

  • resume 恢复挂起的进程

  • terminate 终止进程


event处理

一个系统会产生各种事件。事件管理员(event manager)是一个erlang进程,可以接收各种特定的事件。 事件回调(event handler)负责处理事件。

事件处理遵从OTP的思想把事件流转的基础设施和具体业务分离。 基础设施包括启停manager、发送event、发送同步请求、消息路由、增删改handler等。 业务相关包括事件定义、handler的具体业务实现等等。

gen_event是OTP的事件处理模组,包含上述基础设施。 https://erldocs.com/current/stdlib/gen_event.html

ide里新建一个gen_event回调模板,取名alarm。 可以看一遍里面的注释,配合gen_event官方文档。

gen_event:add_handler 添加handler。可添加多个handler。

gen_event:notify发异步消息。不指定handler。 gen_event:sync_notify发同步消息。不指定handler。 走handle_event处理。 所有handler都会收到。

gen_event:call 发同步消息,指定handler。 走handle_call处理。

gen_event:delete_handler 删除某个handler。 会走terminate。

gen_event:swap_handler 替换handler。 会走被替换handler的terminate,新handler的init。

gen_event:stop 停止manager。

测试: 启动,添加handler。handle_event里添加打印,即可看到事件打印。

alarm:start_link(),
gen_event:add_handler(alarm, alarm, []),
gen_event:notify(alarm, {wtf, 666}),

Supervisor

监控机制帮助程序员处理各种异常情况。可设定各种策略,规范程序行为,让其表现一致。 https://erldocs.com/current/stdlib/supervisor.html

监控树

  • 监控者(supervisor)本身也是一个进程,负责管理和监控子进程。 它创建子进程并把自己和它们link起来,这样能监控到子进程的退出和一些异常状态。然后采取预定措施。 子进程也可以是一个监控者。这样可形成一个树状结构。

  • 系统的容错(fault tolerance)就是依靠监控树实现。

  • 我们先自己做一个监控系统,以便更好地理解erlang和监控。

  • 起一个普通的erlang模组,起名my_super。 初始化时接收一个子任务信息的list,按信息启动子进程。 任何子进程出现异常,把它重启。 子进程正常结束的话,把它移出监控。

-module(my_super).
-author("raide").

%% API
-export([start/2, init/1, stop/1]).

% 入口。起进程,调init,传入任务列表。
start(Name, Tasks) ->
    io:format("my_super start~n"),
    register(Name, Pid = spawn(?MODULE, init, [Tasks])),
    {ok, Pid}.

% 结束
stop(Name) -> Name ! stop.

% 对与每个task(模组M,函数F,参数A)。用apply调用F。
% 规定F要用spawn_link起子进程,且返回的第二个值为子进程的Pid。
% 用element取第二个值即Pid,和{M, F, A}组成新的list项。
% 返回[{Pid, {M, F, A}}, ...]形式的list
start_tasks(Tasks) ->
    [{element(2, apply(M, F, A)), {M, F, A}} || {M, F, A} <- Tasks].


init(Tasks) ->
    io:format("my_super init~n"),

    % 设置捕获exit消息
    process_flag(trap_exit, true),

    % 启动所有task,并进入loop等消息。
    loop(start_tasks(Tasks)).

% 进程结束时会给link的进程发消息{'EXIT', Pid, Reason}
% Reason可能为normal,kill,Other。
loop(Children) ->
    io:format("my_super loop~n"),
    receive
        {'EXIT', Pid, normal} ->
            % 正常退出,删除Pid对应的任务。继续loop。
            io:format("my_super loop ~p~n", [{'EXIT', Pid, normal}]),
            loop(lists:keydelete(Pid, 1, Children));
        {'EXIT', Pid, Reason} ->
            % 其他原因。重启任务。
            io:format("my_super loop ~p~n", [{'EXIT', Pid, Reason}]),
            {_, {M, F, A}} = lists:keyfind(Pid, 1, Children),
            {ok, NewPid} = apply(M, F, A),
            NewChildren = lists:keyreplace(Pid, 1, Children, {NewPid, {M, F, A}}),
            loop(NewChildren);
        stop ->
            io:format("my_super stop~n"),
            % 杀掉所有任务
            % foreach对list里每个元素调用自定义函数
            lists:foreach(fun({Pid, _}) -> exit(Pid, kill) end, Children)
    end.

再做一个app模组,处理实际业务。

-module(app).
-author("raide").

%% API
-export([start_link/0, init/0]).

start_link() ->
    io:format("app start_link~n"),
    {ok, spawn_link(?MODULE, init, [])}.

init() ->
    io:format("app init~n"),
    register(?MODULE, self()),
    loop().

loop() ->
    io:format("app loop~n"),
    receive
        Any ->
            io:format("app receive ~p~n", [Any]),
            loop()
    end.

注意export,否则可能出现undef的异常。

在shell里测试:

可用observer观察运行时的各种信息
observer:start().

启动
my_super:start(my_super, [{app, start_link, []}]).

获取app任务的pid并强行exit。observer里可以观察到老的app消失,出现一个新的app。
Pid = whereis(app).
exit(Pid, kill).

再次获取app的pid。可以看到Pid变了。
PP = whereis(app).

stop全部
my_super:stop(my_super).

OTP supervisor

ide创建supervisor mini模板。 模板代码很少,只要按规定在init回调里填上基本的配置即可。

一个任务的定义

AChild = #{id => 'AName',
        start => {'AModule', start_link, []},
        restart => permanent,
        shutdown => 2000,
        type => worker,
        modules => ['AModule']},
  • id是任务的名字。

  • start是启动信息。包含模组名,启动函数,参数。即({M, F, A})。

  • restart是任务终止后的重启策略。 permanent总是重启。temporary不重启。transient,只在非正常终止时重启()

  • shutdown定义如何终止子进程。 brutal_kill是用exit(Child,kill)直接发kill。 填数字的话指定了超时时间,用exit(Child,shutdown)发shutdown,等待子进程回复。如果超时,再发kill。

  • type定义子进程类型worker或supervisor。 一共两种类型,非常清晰。

  • modules即start里的AModule。

实际可能定义多种任务比如A,B,C。

init回调须返回{ok,{SupFlags,[ChildSpec]}}。

ChildSpec是任务list即[A, B, C]。 SupFlags是这个supervisor的性质:

  • strategy one_for_one 一个子进程挂了,只重启这个子进程。 one_for_all 一个子进程挂了,重启所有子进程。 rest_for_one 一个子进程挂了,按所有子进程的启动顺序,从这个子进程开始,后边的一次重启。 simple_one_for_one 只接受一种任务。动态监控。

  • intensity和period 防止无限生成大量子进程。限制period秒内最多只能重启intensity次。 超出的话终止所有子进程且supervisor自己也会终止。


动态子进程

之前的supervisor初始化方式是静态的,在init里定义好属性。 还可以动态地处理子进程。

supervisor:start_child
supervisor:terminate_child
supervisor:delete_child
supervisor:restart_child
...

非OTP的兼容

有时需要兼容一些非OTP的模组,比如老代码。 可用supervisor_bridge进行处理,把它加入监控树。


FSM

https://erlang.org/doc/design_principles/statem.html

erlang自带一个状态机模组。可方便地实现状态机应用。这里不详细看了。 大体上就是定义好所有状态、进入/离开状态的回调等等。然后以消息去驱动状态轮转。


总结

  • 进一步学习了gen_server,supervisor等otp组件。

  • 做了一些小程序,更多了解了erlang。为以后打下基础。