1 Star 0 Fork 2.9K

afsoft/dgiot

forked from dgiot开源社区/dgiot 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
emqx_frame_SUITE.erl 25.94 KB
一键复制 编辑 原始数据 按行查看 历史
lsxredrain 提交于 2022-01-07 16:30 . fix: add make ci
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
%%--------------------------------------------------------------------
%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_frame_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx_ct_helpers/include/emqx_ct.hrl").
all() ->
[{group, parse},
{group, connect},
{group, connack},
{group, publish},
{group, puback},
{group, subscribe},
{group, suback},
{group, unsubscribe},
{group, unsuback},
{group, ping},
{group, disconnect},
{group, auth}
].
groups() ->
[{parse, [parallel],
[t_parse_cont,
t_parse_frame_too_large,
t_parse_frame_malformed_variable_byte_integer
]},
{connect, [parallel],
[t_serialize_parse_v3_connect,
t_serialize_parse_v4_connect,
t_serialize_parse_v5_connect,
t_serialize_parse_connect_without_clientid,
t_serialize_parse_connect_with_will,
t_serialize_parse_bridge_connect
]},
{connack, [parallel],
[t_serialize_parse_connack,
t_serialize_parse_connack_v5
]},
{publish, [parallel],
[t_parse_sticky_frames,
t_serialize_parse_qos0_publish,
t_serialize_parse_qos1_publish,
t_serialize_parse_qos2_publish,
t_serialize_parse_publish_v5
]},
{puback, [parallel],
[t_serialize_parse_puback,
t_serialize_parse_puback_v3_4,
t_serialize_parse_puback_v5,
t_serialize_parse_pubrec,
t_serialize_parse_pubrec_v5,
t_serialize_parse_pubrel,
t_serialize_parse_pubrel_v5,
t_serialize_parse_pubcomp,
t_serialize_parse_pubcomp_v5
]},
{subscribe, [parallel],
[t_serialize_parse_subscribe,
t_serialize_parse_subscribe_v5
]},
{suback, [parallel],
[t_serialize_parse_suback,
t_serialize_parse_suback_v5
]},
{unsubscribe, [parallel],
[t_serialize_parse_unsubscribe,
t_serialize_parse_unsubscribe_v5
]},
{unsuback, [parallel],
[t_serialize_parse_unsuback,
t_serialize_parse_unsuback_v5
]},
{ping, [parallel],
[t_serialize_parse_pingreq,
t_serialize_parse_pingresp
]},
{disconnect, [parallel],
[t_serialize_parse_disconnect,
t_serialize_parse_disconnect_v5
]},
{auth, [parallel],
[t_serialize_parse_auth_v5]
}].
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
ok.
init_per_group(_Group, Config) ->
Config.
end_per_group(_Group, _Config) ->
ok.
t_parse_cont(_) ->
Packet = ?CONNECT_PACKET(#mqtt_packet_connect{}),
ParseState = emqx_frame:initial_parse_state(),
<<HdrBin:1/binary, LenBin:1/binary, RestBin/binary>> = serialize_to_binary(Packet),
{more, ContParse} = emqx_frame:parse(<<>>, ParseState),
{more, ContParse1} = emqx_frame:parse(HdrBin, ContParse),
{more, ContParse2} = emqx_frame:parse(LenBin, ContParse1),
{more, ContParse3} = emqx_frame:parse(<<>>, ContParse2),
{ok, Packet, <<>>, _} = emqx_frame:parse(RestBin, ContParse3).
t_parse_frame_too_large(_) ->
Packet = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, payload(1000)),
?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 256})),
?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})),
?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
t_parse_frame_malformed_variable_byte_integer(_) ->
MalformedPayload = << <<16#80>> || _ <- lists:seq(1, 4) >>,
ParseState = emqx_frame:initial_parse_state(#{}),
?catch_error(malformed_variable_byte_integer,
emqx_frame:parse(MalformedPayload, ParseState)).
t_parse_frame_variable_byte_integer(_) ->
Bin = <<2#10010011, 2#10000000, 2#10001000, 2#10011001, 2#10101101, 2#00110010>>,
?catch_error(malformed_variable_byte_integer,
emqx_frame:parse_variable_byte_integer(Bin)).
t_serialize_parse_v3_connect(_) ->
Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108,
111,99,97>>,
Packet = ?CONNECT_PACKET(
#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3,
proto_name = <<"MQIsdp">>,
clientid = <<"mosqpub/10451-iMac.loca">>,
clean_start = true,
keepalive = 60
}),
{ok, Packet, <<>>, PState} = emqx_frame:parse(Bin),
?assertMatch({none, #{version := ?MQTT_PROTO_V3}}, PState).
t_serialize_parse_v4_connect(_) ->
Bin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117,
98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
Packet = ?CONNECT_PACKET(
#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V4,
proto_name = <<"MQTT">>,
clientid = <<"mosqpub/10451-iMac.loca">>,
clean_start = true,
keepalive = 60
}),
?assertEqual(Bin, serialize_to_binary(Packet)),
?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)).
t_serialize_parse_v5_connect(_) ->
Props = #{'Session-Expiry-Interval' => 60,
'Receive-Maximum' => 100,
'Maximum-QoS' => ?QOS_2,
'Retain-Available' => 1,
'Maximum-Packet-Size' => 1024,
'Topic-Alias-Maximum' => 10,
'Request-Response-Information' => 1,
'Request-Problem-Information' => 1,
'Authentication-Method' => <<"oauth2">>,
'Authentication-Data' => <<"33kx93k">>
},
WillProps = #{'Will-Delay-Interval' => 60,
'Payload-Format-Indicator' => 1,
'Message-Expiry-Interval' => 60,
'Content-Type' => <<"text/json">>,
'Response-Topic' => <<"topic">>,
'Correlation-Data' => <<"correlateid">>,
'User-Property' => [{<<"k">>, <<"v">>}]
},
Packet = ?CONNECT_PACKET(
#mqtt_packet_connect{proto_name = <<"MQTT">>,
proto_ver = ?MQTT_PROTO_V5,
is_bridge = false,
clean_start = true,
clientid = <<>>,
will_flag = true,
will_qos = ?QOS_1,
will_retain = false,
keepalive = 60,
properties = Props,
will_props = WillProps,
will_topic = <<"topic">>,
will_payload = <<>>,
username = <<"device:1">>,
password = <<"passwd">>
}),
?assertEqual(Packet, parse_serialize(Packet)).
t_serialize_parse_connect_without_clientid(_) ->
Bin = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
Packet = ?CONNECT_PACKET(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V4,
proto_name = <<"MQTT">>,
clientid = <<>>,
clean_start = true,
keepalive = 60
}),
?assertEqual(Bin, serialize_to_binary(Packet)),
?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)).
t_serialize_parse_connect_with_will(_) ->
Bin = <<16,67,0,6,77,81,73,115,100,112,3,206,0,60,0,23,109,111,115,113,112,
117,98,47,49,48,52,53,50,45,105,77,97,99,46,108,111,99,97,0,5,47,119,
105,108,108,0,7,119,105,108,108,109,115,103,0,4,116,101,115,116,0,6,
112,117,98,108,105,99>>,
Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT},
variable = #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3,
proto_name = <<"MQIsdp">>,
clientid = <<"mosqpub/10452-iMac.loca">>,
clean_start = true,
keepalive = 60,
will_retain = false,
will_qos = ?QOS_1,
will_flag = true,
will_topic = <<"/will">>,
will_payload = <<"willmsg">>,
username = <<"test">>,
password = <<"public">>
}},
?assertEqual(Bin, serialize_to_binary(Packet)),
?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)).
t_serialize_parse_bridge_connect(_) ->
Bin = <<16,86,0,6,77,81,73,115,100,112,131,44,0,60,0,19,67,95,48,48,58,48,67,
58,50,57,58,50,66,58,55,55,58,53,50,0,48,36,83,89,83,47,98,114,111,107,
101,114,47,99,111,110,110,101,99,116,105,111,110,47,67,95,48,48,58,48,
67,58,50,57,58,50,66,58,55,55,58,53,50,47,115,116,97,116,101,0,1,48>>,
Topic = <<"$SYS/broker/connection/C_00:0C:29:2B:77:52/state">>,
Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT},
variable = #mqtt_packet_connect{clientid = <<"C_00:0C:29:2B:77:52">>,
proto_ver = 16#03,
proto_name = <<"MQIsdp">>,
is_bridge = true,
will_retain = true,
will_qos = ?QOS_1,
will_flag = true,
clean_start = false,
keepalive = 60,
will_topic = Topic,
will_payload = <<"0">>
}},
?assertEqual(Bin, serialize_to_binary(Packet)),
?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)),
Packet1 = ?CONNECT_PACKET(#mqtt_packet_connect{is_bridge = true}),
?assertEqual(Packet1, parse_serialize(Packet1)).
t_serialize_parse_connack(_) ->
Packet = ?CONNACK_PACKET(?RC_SUCCESS),
?assertEqual(<<32,2,0,0>>, serialize_to_binary(Packet)),
?assertEqual(Packet, parse_serialize(Packet)).
t_serialize_parse_connack_v5(_) ->
Props = #{'Session-Expiry-Interval' => 60,
'Receive-Maximum' => 100,
'Maximum-QoS' => ?QOS_2,
'Retain-Available' => 1,
'Maximum-Packet-Size' => 1024,
'Assigned-Client-Identifier' => <<"id">>,
'Topic-Alias-Maximum' => 10,
'Reason-String' => <<>>,
'Wildcard-Subscription-Available' => 1,
'Subscription-Identifier-Available' => 1,
'Shared-Subscription-Available' => 1,
'Server-Keep-Alive' => 60,
'Response-Information' => <<"response">>,
'Server-Reference' => <<"192.168.1.10">>,
'Authentication-Method' => <<"oauth2">>,
'Authentication-Data' => <<"33kx93k">>
},
Packet = ?CONNACK_PACKET(?RC_SUCCESS, 0, Props),
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
t_parse_sticky_frames(_) ->
Payload = lists:duplicate(10, 0),
P = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
dup = false,
qos = ?QOS_0,
retain = false},
variable = #mqtt_packet_publish{topic_name = <<"a/b">>,
packet_id = undefined},
payload = iolist_to_binary(Payload)
},
Bin = serialize_to_binary(P),
Size = size(Bin),
<<H:(Size-2)/binary, TailTwoBytes/binary>> = Bin,
{more, PState1} = emqx_frame:parse(H), %% needs 2 more bytes
%% feed 3 bytes as if the next 1 byte belongs to the next packet.
{ok, _, <<42>>, PState2} = emqx_frame:parse(iolist_to_binary([TailTwoBytes, 42]), PState1),
?assertMatch({none, _}, PState2).
t_serialize_parse_qos0_publish(_) ->
Bin = <<48,14,0,7,120,120,120,47,121,121,121,104,101,108,108,111>>,
Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
dup = false,
qos = ?QOS_0,
retain = false},
variable = #mqtt_packet_publish{topic_name = <<"xxx/yyy">>,
packet_id = undefined},
payload = <<"hello">>},
?assertEqual(Bin, serialize_to_binary(Packet)),
?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})).
t_serialize_parse_qos1_publish(_) ->
Bin = <<50,13,0,5,97,47,98,47,99,0,1,104,97,104,97>>,
Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
dup = false,
qos = ?QOS_1,
retain = false},
variable = #mqtt_packet_publish{topic_name = <<"a/b/c">>,
packet_id = 1},
payload = <<"haha">>},
?assertEqual(Bin, serialize_to_binary(Packet)),
?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})),
%% strict_mode = true
?catch_error(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 0, <<>>))),
%% strict_mode = false
_ = parse_serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 0, <<>>), #{strict_mode => false}).
t_serialize_parse_qos2_publish(_) ->
Packet = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>),
Bin = <<52,9,0,5,84,111,112,105,99,0,1>>,
?assertEqual(Packet, parse_serialize(Packet)),
?assertEqual(Bin, serialize_to_binary(Packet)),
?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})),
%% strict_mode = true
?catch_error(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 0, <<>>))),
%% strict_mode = false
_ = parse_serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 0, <<>>), #{strict_mode => false}).
t_serialize_parse_publish_v5(_) ->
Props = #{'Payload-Format-Indicator' => 1,
'Message-Expiry-Interval' => 60,
'Topic-Alias' => 16#AB,
'Response-Topic' => <<"reply">>,
'Correlation-Data' => <<"correlation-id">>,
'Subscription-Identifier' => 1,
'Content-Type' => <<"text/json">>},
Packet = ?PUBLISH_PACKET(?QOS_1, <<"$share/group/topic">>, 1, Props, <<"payload">>),
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
t_serialize_parse_puback(_) ->
Packet = ?PUBACK_PACKET(1),
?assertEqual(<<64,2,0,1>>, serialize_to_binary(Packet)),
?assertEqual(Packet, parse_serialize(Packet)),
%% strict_mode = true
?catch_error(bad_packet_id, parse_serialize(?PUBACK_PACKET(0))),
%% strict_mode = false
?PUBACK_PACKET(0) = parse_serialize(?PUBACK_PACKET(0), #{strict_mode => false}).
t_serialize_parse_puback_v3_4(_) ->
Bin = <<64,2,0,1>>,
Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK}, variable = 1},
?assertEqual(Bin, serialize_to_binary(Packet, ?MQTT_PROTO_V3)),
?assertEqual(Bin, serialize_to_binary(Packet, ?MQTT_PROTO_V4)),
?assertEqual(?PUBACK_PACKET(1), parse_to_packet(Bin, #{version => ?MQTT_PROTO_V3})),
?assertEqual(?PUBACK_PACKET(1), parse_to_packet(Bin, #{version => ?MQTT_PROTO_V4})).
t_serialize_parse_puback_v5(_) ->
Packet = ?PUBACK_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
t_serialize_parse_pubrec(_) ->
Packet = ?PUBREC_PACKET(1),
?assertEqual(<<5:4,0:4,2,0,1>>, serialize_to_binary(Packet)),
?assertEqual(Packet, parse_serialize(Packet)),
%% strict_mode = true
?catch_error(bad_packet_id, parse_serialize(?PUBREC_PACKET(0))),
%% strict_mode = false
?PUBREC_PACKET(0) = parse_serialize(?PUBREC_PACKET(0), #{strict_mode => false}).
t_serialize_parse_pubrec_v5(_) ->
Packet = ?PUBREC_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
t_serialize_parse_pubrel(_) ->
Packet = ?PUBREL_PACKET(1),
Bin = serialize_to_binary(Packet),
?assertEqual(<<6:4,2:4,2,0,1>>, Bin),
?assertEqual(Packet, parse_serialize(Packet)),
%% PUBREL with bad qos 0
Bin0 = <<6:4,0:4,2,0,1>>,
?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})),
?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})),
%% strict_mode = false
?PUBREL_PACKET(0) = parse_serialize(?PUBREL_PACKET(0), #{strict_mode => false}),
%% strict_mode = true
?catch_error(bad_packet_id, parse_serialize(?PUBREL_PACKET(0))).
t_serialize_parse_pubrel_v5(_) ->
Packet = ?PUBREL_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
t_serialize_parse_pubcomp(_) ->
Packet = ?PUBCOMP_PACKET(1),
Bin = serialize_to_binary(Packet),
?assertEqual(<<7:4,0:4,2,0,1>>, Bin),
?assertEqual(Packet, parse_serialize(Packet)),
%% strict_mode = false
?PUBCOMP_PACKET(0) = parse_serialize(?PUBCOMP_PACKET(0), #{strict_mode => false}),
%% strict_mode = true
?catch_error(bad_packet_id, parse_serialize(?PUBCOMP_PACKET(0))).
t_serialize_parse_pubcomp_v5(_) ->
Packet = ?PUBCOMP_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
t_serialize_parse_subscribe(_) ->
%% SUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[{<<"TopicA">>,2}])
Bin = <<?SUBSCRIBE:4,2:4,11,0,2,0,6,84,111,112,105,99,65,2>>,
TopicOpts = #{nl => 0 , rap => 0, rh => 0, qos => 2},
TopicFilters = [{<<"TopicA">>, TopicOpts}],
Packet = ?SUBSCRIBE_PACKET(2, TopicFilters),
?assertEqual(Bin, serialize_to_binary(Packet)),
?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})),
%% SUBSCRIBE with bad qos 0
Bin0 = <<?SUBSCRIBE:4,0:4,11,0,2,0,6,84,111,112,105,99,65,2>>,
?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})),
%% strict_mode = false
_ = parse_to_packet(Bin0, #{strict_mode => false}),
?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})),
%% strict_mode = false
_ = parse_serialize(?SUBSCRIBE_PACKET(0, TopicFilters), #{strict_mode => false}),
%% strict_mode = true
?catch_error(bad_packet_id, parse_serialize(?SUBSCRIBE_PACKET(0, TopicFilters))),
?catch_error(bad_subqos, parse_serialize(?SUBSCRIBE_PACKET(1, [{<<"t">>, #{qos => 3}}]))).
t_serialize_parse_subscribe_v5(_) ->
TopicFilters = [{<<"TopicQos0">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0}},
{<<"TopicQos1">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0}}],
Packet = ?SUBSCRIBE_PACKET(3, #{'Subscription-Identifier' => 16#FFFFFFF}, TopicFilters),
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
t_serialize_parse_suback(_) ->
Packet = ?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128]),
?assertEqual(Packet, parse_serialize(Packet)),
%% strict_mode = false
_ = parse_serialize(?SUBACK_PACKET(0, [?QOS_0]), #{strict_mode => false}),
%% strict_mode = true
?catch_error(bad_packet_id, parse_serialize(?SUBACK_PACKET(0, [?QOS_0]))).
t_serialize_parse_suback_v5(_) ->
Packet = ?SUBACK_PACKET(1, #{'Reason-String' => <<"success">>,
'User-Property' => [{<<"key">>, <<"value">>}]},
[?QOS_0, ?QOS_1, 128]),
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
t_serialize_parse_unsubscribe(_) ->
%% UNSUBSCRIBE(Q1, R1, D0, PacketId=2, TopicTable=[<<"TopicA">>])
Bin = <<?UNSUBSCRIBE:4,2:4,10,0,2,0,6,84,111,112,105,99,65>>,
Packet = ?UNSUBSCRIBE_PACKET(2, [<<"TopicA">>]),
?assertEqual(Bin, serialize_to_binary(Packet)),
?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})),
%% UNSUBSCRIBE with bad qos
%% UNSUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[<<"TopicA">>])
Bin0 = <<?UNSUBSCRIBE:4,0:4,10,0,2,0,6,84,111,112,105,99,65>>,
?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})),
?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})),
%% strict_mode = false
_ = parse_serialize(?UNSUBSCRIBE_PACKET(0, [<<"TopicA">>]), #{strict_mode => false}),
%% strict_mode = true
?catch_error(bad_packet_id, parse_serialize(?UNSUBSCRIBE_PACKET(0, [<<"TopicA">>]))).
t_serialize_parse_unsubscribe_v5(_) ->
Props = #{'User-Property' => [{<<"key">>, <<"val">>}]},
Packet = ?UNSUBSCRIBE_PACKET(10, Props, [<<"Topic1">>, <<"Topic2">>]),
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
t_serialize_parse_unsuback(_) ->
Packet = ?UNSUBACK_PACKET(10),
?assertEqual(Packet, parse_serialize(Packet)).
t_serialize_parse_unsuback_v5(_) ->
Packet = ?UNSUBACK_PACKET(10, #{'Reason-String' => <<"Not authorized">>,
'User-Property' => [{<<"key">>, <<"val">>}]},
[16#87, 16#87, 16#87]),
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
t_serialize_parse_pingreq(_) ->
PingReq = ?PACKET(?PINGREQ),
?assertEqual(PingReq, parse_serialize(PingReq)).
t_serialize_parse_pingresp(_) ->
PingResp = ?PACKET(?PINGRESP),
?assertEqual(PingResp, parse_serialize(PingResp)).
t_parse_disconnect(_) ->
Packet = ?DISCONNECT_PACKET(?RC_SUCCESS),
?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(<<224, 0>>)).
t_serialize_parse_disconnect(_) ->
Packet = ?DISCONNECT_PACKET(?RC_SUCCESS),
?assertEqual(Packet, parse_serialize(Packet)).
t_serialize_parse_disconnect_v5(_) ->
Packet = ?DISCONNECT_PACKET(?RC_SUCCESS,
#{'Session-Expiry-Interval' => 60,
'Reason-String' => <<"server_moved">>,
'Server-Reference' => <<"192.168.1.10">>
}),
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
t_serialize_parse_auth_v5(_) ->
Packet = ?AUTH_PACKET(?RC_SUCCESS,
#{'Authentication-Method' => <<"oauth2">>,
'Authentication-Data' => <<"3zekkd">>,
'Reason-String' => <<"success">>,
'User-Property' => [{<<"key1">>, <<"val1">>},
{<<"key2">>, <<"val2">>}]
}),
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})),
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5,
strict_mode => true})).
parse_serialize(Packet) ->
parse_serialize(Packet, #{strict_mode => true}).
parse_serialize(Packet, Opts) when is_map(Opts) ->
Ver = maps:get(version, Opts, ?MQTT_PROTO_V4),
Bin = iolist_to_binary(emqx_frame:serialize(Packet, Ver)),
ParseState = emqx_frame:initial_parse_state(Opts),
{ok, NPacket, <<>>, _} = emqx_frame:parse(Bin, ParseState),
NPacket.
serialize_to_binary(Packet) ->
iolist_to_binary(emqx_frame:serialize(Packet)).
serialize_to_binary(Packet, Ver) ->
iolist_to_binary(emqx_frame:serialize(Packet, Ver)).
parse_to_packet(Bin, Opts) ->
PState = emqx_frame:initial_parse_state(Opts),
{ok, Packet, <<>>, _} = emqx_frame:parse(Bin, PState),
Packet.
payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)).
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Erlang
1
https://gitee.com/afsoft_admin/dgiot.git
[email protected]:afsoft_admin/dgiot.git
afsoft_admin
dgiot
dgiot
master

搜索帮助