Erlang服务程序和OTP基础#
之前已经学过erlang的基本概念和操作。 http://xiongchen.cc/post/29
这次学习实际的应用和OTP的基础。 api文档 https://erldocs.com/ https://www.erlang.org/docs
环境 erlang版本用的最新的otp24.0。 ide用的是Intellij idea 2021.2社区版。 ide进设置,安装erlang插件。 创建project,选erlang。完了开始调代码。
基本的socket流程#
TCP服务器#
erlang的gen_tcp提供tcp socket的接口。 可用gen_tcp来实现完整的tcp服务程序。 gen代表generic。
src目录下新建erlang module,起名为tcp_app。
-module(tcp_app).
-author("xc").
%% API
-export([start_server/1, send_data/3]). % 暴露两个接口给外部调用,后面的数字表示参数个数。
loop(Socket) ->
io:format("server loop ~n"),
% 收消息
receive
% 匹配各种消息
{tcp_closed, Socket} -> % socket关闭
io:format("server recv tcp_closed ~n");
{tcp, Socket, Data} -> % 正常的tcp数据
io:format("server recv ~p~n", [Data]),
% 发数据
gen_tcp:send(Socket, Data),
% active once模式。每次收到消息后要主动设置一下,才会收到后续消息。
inet:setopts(Socket, [{active, once}]),
% 继续loop
loop(Socket)
end.
start_accept(Listen) ->
io:format("server start_accept ~n"),
% 开始accept。一旦成功马上另起一个accept进程。自己继续处理连上来的socket。
{ok, Socket} = gen_tcp:accept(Listen),
spawn(fun() -> start_accept(Listen) end),
loop(Socket).
start_server(Port) -> % 启动服务器的入口
io:format("server start_server ~n"),
% 标准的socket流程。先listen。第二个参数是各种选项。binary表示数据包是以二进制数据展示。
% {packet, N}是erlang处理tcp包分包的一个机制,
% {active, once}。socket是active还是passive。可选true,false,once
{ok, Listen} = gen_tcp:listen(Port, [binary, {packet, 0}, {active, once}]),
% listen成功后开始start_accept
spawn(fun() -> start_accept(Listen) end).
send_data(Host, Port, Data) ->
% 客户端连接服务器连接ip端口。
{ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 0}]),
% 发一条数据并接收
ok = gen_tcp:send(Socket, Data),
receive
Any ->
io:format("client recv ~p~n", [Any])
end.
有两个重要选项
{packet, 0 | 1 | 2 | 4 | raw | sunrm | asn1 | cdr | fcgi | line | tpkt | http | httph | http_bin | httph_bin} |
https://erldocs.com/current/kernel/inet.html#packet 定义数据包的格式。选0不处理数据。
选1、2、4。数据包里会添加一个1或2或4字节的header,包含实际数据的长度。4字节可表示2G字节的数据包。
实际的作用:对于tcp数据,可能一次收发不全一条完整的数据,或者多收部分数据。
这样造成接收端分不清数据边界,不能直接解析数据。
所以我们都要定一个数据包的格式协议,确定消息的边界,这样收发两端才能正确通信。
这个选项其实就是一个最简单的协议,把每条完整数据的长度定死。
这样buffer里每次累积到协定长度的数据,就认为是一个完整的数据包。这样loop里每次receive的数据就是完整的一条数据了,不用再费劲组包了。
还包含http等其他内置协议。如果是自定义协议还是得自己实现。
{active, true | false | once | -32768..32767} |
https://erldocs.com/current/kernel/inet.html#setopts/2socket是active还是passive。可选true,false,once。这是erlang控制数据包的获取的一个机制。
如果是true,loop进程的控制进程不会进行控制,只要有数据就会推到loop里。
如果是false,loop必须调gen_tcp:recv才能收到数据。
如果是once,loop每次收到数据后必须调一次
inet:setopts(Sock, [{active, once}])
才能再次收到一个消息。 还可以设为一个数N,默认收N条数据。
测试程序#
同样src目录下新建erlang module,起名为app。
-module(app).
-author("xc").
%% API
-export([gg/0]).
-import(tcp_app, [start_server/1, send_data/1]).
gg()->
io:format("ggwtf~n"),
tcp_app:start_server(9999),
tcp_app:send_data("127.0.0.1", 9999, "wtf1"),
tcp_app:send_data("127.0.0.1", 9999, "wtf2"),
tcp_app:send_data("127.0.0.1", 9999, "wtf3").
暴露的接口为gg。
run的配置里module and function里填上”app gg”。即运行app模块的gg函数。再run即可看到打印。
ide里build后在out/production/xxx/目录会生成所有代码的beam文件。
可以进这个文件夹运行erlang的命令行程序erl进行操作。类似python的idle shell。
进shell直接运行 tcp_app:start_server(9999).
等等。
UDP服务器#
erlang的gen_udp提供udp socket的接口。
可用gen_udp来实现完整的udp服务程序。
src目录下新建erlang module,起名为udp_app。
udp比tcp简单一些。
-module(udp_app).
-author("xc").
%% API
-export([start_server/0, send_data/1]).
start_server() ->
spawn(fun() -> do_start_server(9999) end).
loop(Socket) ->
receive
{udp, Socket, Host, Port, Bin} = Msg ->
io:format("server recv ~p~n", [Msg]),
gen_udp:send(Socket, Host, Port, Bin),
loop(Socket)
end.
do_start_server(Port) ->
{ok, Socket} = gen_udp:open(Port, [binary]),
io:format("server opened socket ~p~n", [Socket]),
loop(Socket).
send_data(Data) ->
{ok, Socket} = gen_udp:open(0, [binary]),
ok = gen_udp:send(Socket, "127.0.0.1", 9999, Data),
receive
{udp, Socket, Host, Port, D} ->
io:format("client recv ~p ~p ~p ~n", [Host, Port, D]);
Any ->
io:format("client recv ~p~n", [Any])
end.
OTP#
OTP全称Open Telecom Platform。这个名称会造成误解。
其实它包括一系列的框架、库、工具,可用来搭建大型、可扩展、可动态升级、可容灾的高并发分布式通信系统。
OTP的核心思想是所谓的OTP behavior,其实就是抽象的框架。
有些常用的程序架构比如C/S、状态机、事件、监控等等。把他们做成程序框架,就可以方便地复用了。
下面要学习最常用的gen_server。
为更好地理解otp,从最简单的抽象开始自己写代码,一点点加功能,最后接近系统自带的gen_server。
server1#
-module(server1).
-author("xc").
%% API
-export([start/2, rpc/2]).
loop(Name, Mod, State) ->
io:format("loop~n"),
% 收消息
receive
{From, Request} ->
% 调用应用模块的handle函数
{Response, State1} = Mod:handle(Request, State),
% 发送结果
From ! {Name, Response},
% 继续loop
loop(Name, Mod, State1)
end.
start(Name, Mod) ->
% 把名字和进程绑定,后续就能直接给这个名字发消息。
register(Name, spawn(fun() -> loop(Name, Mod, Mod:init()) end)).
rpc(Name, Request) ->
io:format("recv rpc~n"),
% 给loop进程发消息
Name ! {self(), Request},
% 收回复
receive
{Name, Response} ->
Response
end.
server1大致是接收一个应用模块(Mod),Mod有一个handle函数可处理实际业务。
起进程不断收消息。消息的来源是rpc。
收到消息时调用应用模块的handle函数,把结果返给rpc,rpc再返给调用者。
总结一下,思想就是实现一个通用的框架,接收一个应用模块,规定好应用模块handle的参数和返回格式。
应用模块只管实现自己的业务。server1负责实现消息的传递、rpc和进程管理。
这也是otp的主要思想,实现通用的底层设施,提供给应用模块。应用模块专心处理业务就好。
-module(name_server).
-author("xc").
%% API
-export([]).
-import(server1, [rpc/2]).
-export([init/0, add/2, find/1, handle/2, start/0]).
start() ->
% 起服务
server1:start(name_server, name_server).
add(Name, Place) ->
% 发rpc
Result = rpc(name_server, {add, Name, Place}),
io:format("add result ~p~n", [Result]).
find(Name) ->
% 发rpc
Result = rpc(name_server, {find, Name}),
io:format("find result ~p~n", [Result]).
init() ->
% 业务初始化
dict:new().
% 实现业务
handle({add, Name, Place}, Dict) ->
io:format("handle1~n"),
{ok, dict:store(Name, Place, Dict)};
handle({find, Name}, Dict) ->
io:format("handle2~n"),
{dict:find(Name, Dict), Dict}.
应用模块按规定实现handle。接收两个参数,Request和State,处理Request,返回新的State。
这里State是一个dict,某种意义上也可以说是一种状态。
这里实现的是一个kv存储。
应用模块暴露给最终用户的api基本都是业务相关。直接发rpc,带上Request。
name_server:start(),
name_server:add(gg, "wtf"),
name_server:find(wtf),
name_server:find(gg).
server2#
server1没有错误处理。如果业务模块出了异常,整个server完蛋。
在name_server的handle里加一个throw(“wtfffff”)即可复现。
server2添加了错误处理。对业务模块的handle进行catch。
如果出了异常,可以报告给业务模块。并且保持老的State,继续loop。
-module(server2).
-author("xc").
%% API
-export([start/2, rpc/2]).
loop(Name, Mod, OldState) ->
io:format("loop~n"),
% 收消息
receive
{From, Request} ->
% catch应用模块的handle函数
try Mod:handle(Request, OldState) of
{Response, NewState} ->
% 发送结果
From ! {Name, ok, Response},
% 继续loop
loop(Name, Mod, NewState)
catch
_:Why ->
io:format("Server ~p request ~p ~ncaused exception ~p~n", [Name, Request, Why]),
% 报告crash
From ! {Name, crash},
% 继续loop
loop(Name, Mod, OldState)
end
end.
start(Name, Mod) ->
% 把名字和进程绑定,后续就能直接给这个名字发消息。
register(Name, spawn(fun() -> loop(Name, Mod, Mod:init()) end)).
rpc(Name, Request) ->
io:format("recv rpc~n"),
% 给loop进程发消息
Name ! {self(), Request},
% 收回复
receive
{Name, crash} -> crash; %exit(rpc);
{Name, ok, Response} -> Response
end.
server3#
当前的name_server只有add和find两个业务功能。
如果要加新功能,通常需要更新name_server的代码,重启服务。
erlang可以实现热更新代码,不用重启服务。原理就是把传给server框架的业务模块直接实时替换。
先做一个新的name_server_v2。新加delete和all_names功能。
-module(name_server_v2).
-author("xc").
%% API
-export([]).
-import(server2, [start/2, rpc/2]).
-export([init/0, add/2, find/1, handle/2, start/0, delete/1, all_names/0]).
start() ->
% 起服务
server2:start(name_server, name_server_v2).
add(Name, Place) ->
% 发rpc
Result = rpc(name_server, {add, Name, Place}),
io:format("add result ~p~n", [Result]).
find(Name) ->
% 发rpc
Result = rpc(name_server, {find, Name}),
io:format("find result ~p~n", [Result]).
delete(Name) ->
% 发rpc
Result = rpc(name_server, {delete, Name}),
io:format("delete result ~p~n", [Result]).
all_names() ->
% 发rpc
Result = rpc(name_server, {all_names}),
io:format("all_names result ~p~n", [Result]).
init() ->
% 业务初始化
dict:new().
% 实现业务
handle({add, Name, Place}, Dict) ->
throw("wtfffff"),
io:format("handle1~p~n", [Name]),
{ok, dict:store(Name, Place, Dict)};
handle({find, Name}, Dict) ->
io:format("handle2~n"),
{dict:find(Name, Dict), Dict};
handle({delete, Name}, Dict) ->
io:format("handle3~p~n", [Name]),
{ok, dict:erase(Name, Dict)};
handle({all_names}, Dict) ->
io:format("handle4~n"),
{dict:fetch_keys(Dict), Dict}.
在server2基础上加一个简单的逻辑就能实现代码更新。 做一个新的函数swap_code接收新的模组,发rpc。 loop里匹配到swap_code时用新的模组起loop即可。
-module(server3).
-author("xc").
%% API
-export([start/2, rpc/2, swap_code/2]).
loop(Name, Mod, OldState) ->
io:format("loop~n"),
% 收消息
receive
{From, {swap_code, NewMod}} -> % 匹配swap_code
From ! {Name, ok, ack},
io:format("swap_code~n"),
% 用新的Mod起loop实现代码更新。
loop(Name, NewMod, OldState);
{From, Request} ->
% catch应用模块的handle函数
try Mod:handle(Request, OldState) of
{Response, NewState} ->
% 发送结果
From ! {Name, ok, Response},
% 继续loop
loop(Name, Mod, NewState)
catch
_:Why ->
io:format("Server ~p request ~p ~ncaused exception ~p~n", [Name, Request, Why]),
% 报告crash
From ! {Name, crash},
% 继续loop
loop(Name, Mod, OldState)
end
end.
start(Name, Mod) ->
% 把名字和进程绑定,后续就能直接给这个名字发消息。
register(Name, spawn(fun() -> loop(Name, Mod, Mod:init()) end)).
rpc(Name, Request) ->
io:format("recv rpc~n"),
% 给loop进程发消息
Name ! {self(), Request},
% 收回复
receive
{Name, crash} -> crash; %exit(rpc);
{Name, ok, Response} -> Response
end.
swap_code(Name, Mod) ->
rpc(Name, {swap_code, Mod}).
注意erlang里面有各种模式匹配。比如rpc函数收回复,如果发的消息匹配不了,rpc就会卡在那返回不了。
build项目后在erlang的Eshell里测试。
% 先加载原始的name_server。
server3:start(name_server, name_server).
% 可用find和add。
name_server:add(ggg, "wtf").
% 如果调用delete可以看到报错,找不到函数。
name_server:delete(ggg).
% 这时更新代码。
server3:swap_code(name_server, name_server_v2).
% 之后可以用name_server_v2里新功能了。
name_server_v2:all_names().
name_server_v2:delete(ggg).
这里面还有一些列问题 比如name_server和name_server_v2,调用的模组名字发生了改变。 而且发rpc的第一个参数都是用的name_server,造成老的name_server模组仍可使用。 暂且不管这些。
到此已经了解了gen_server框架的思想。下面看看如何使用正式的gen_server。
OTP之gen_server#
ide里新建erlang文件,可选很多种类型。其中有OTP gen_server和OPT gen_server, minimal。
这是erlang插件提供的代码模板,各创建一个对比一下。
看minimal模板,可以看到样子和之前做的name_server差不多,都是做一些handle实现业务功能,还有启动、关闭、更新代码等辅助。
非minimal的模板包含了较完整的信息,包括各种函数和参数的类型定义。
minimal模板应该是给老手使用的。
下面要实现一个简单的银行系统。为简洁起见在minimal模板上写代码,完整模板作为参考。
银行系统需要存储数据,我们用erlang的ets(erlang term storage)。
ets是erlang自带的跑在内存上的存储系统,简单强大,可存多种类型数据。
创建OPT gen_server, minimal,起名my_bank。
设计功能,定义接口。
start() 启动银行
stop() 关闭银行
new_account(Who) 创建账户
deposit(Who, Amount) 存钱
withdraw(Who, Amount) 取钱
接口里定好rpc请求的格式,用
gen_server:call
发给gen_server。handle_call里根据定好的rpc请求,实现各个业务。
-module(my_bank).
-behaviour(gen_server).
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-define(SERVER, ?MODULE).
%-record(my_bank_state, {}).
-export([start/0, stop/0, new_account/1, deposit/2, withdraw/2]).
%%%===================================================================
%%% Spawning and gen_server implementation
%%%===================================================================
start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
% 业务初始化。创建ets。
init([]) -> {ok, ets:new(?MODULE, [])}.
% 实现所有业务
handle_call(stop, _From, Tab) ->
{stop, normal, stopped, Tab};
handle_call({new, Who}, _From, Tab) ->
% 统一先找用户。
Reply = case ets:lookup(Tab, Who) of
[] -> % 如果不存在。insert{名字, 初始金额}
ets:insert(Tab, {Who, 0}),
{welcome, Who};
[_] -> % 如果存在
{Who, you_exists}
end,
{reply, Reply, Tab};
handle_call({add, Who, Amount}, _From, Tab) ->
Reply = case ets:lookup(Tab, Who) of
[] ->
{Who, no_account};
[{Who, Balance}] ->
% 更新为新金额
NewB = Balance + Amount,
ets:insert(Tab, {Who, NewB}),
{ok}
end,
{reply, Reply, Tab};
handle_call({remove, Who, Amount}, _From, Tab) ->
Reply = case ets:lookup(Tab, Who) of
[] ->
{Who, no_account};
[{Who, Balance}] when Amount =< Balance ->
% 更新为新金额
NewB = Balance - Amount,
ets:insert(Tab, {Who, NewB}),
{ok};
[{Who, Balance}] ->
% 金额不足
{no_enough_mineral, Balance}
end,
{reply, Reply, Tab}.
handle_cast(_Request, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
% 启动银行
start() -> start_link().
% 关闭银行
stop() -> gen_server:call(?MODULE, stop).
% 创建账户
new_account(Who) -> gen_server:call(?MODULE, {new, Who}).
% 存钱
deposit(Who, Amount) -> gen_server:call(?MODULE, {add, Who, Amount}).
% 取钱
withdraw(Who, Amount) -> gen_server:call(?MODULE, {remove, Who, Amount}).
在Eshell里进行各种测试
my_bank:start().
my_bank:withdraw("xc", 100).
my_bank:new_account("xc").
my_bank:new_account("xc").
my_bank:withdraw("xc", 100).
my_bank:deposit("xc", 111).
my_bank:withdraw("xc", 100).
my_bank:stop().
有个问题,如果调一个不存在的函数,会造成服务死掉。必须重新start。不知道原因,后续再研究。
总结#
学习了erlang的socket流程。
学习了erlang OTP的基本思想。
熟悉了gen_server的基本使用。