diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java b/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java index cb6d17b619f128b6376236f54806ea4c049095b9..e34f7c8ab2697a988d3869a33468ae79f114b8da 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/interceptor/MessageProxy.java @@ -78,6 +78,7 @@ public class MessageProxy { .topic(header.topicName()) .retain(fixedHeader.isRetain()) .qos(fixedHeader.qosLevel().value()) + .properties(header.properties()) .build(); } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java index 19c340dac9389abc45642e94393119623571ad09..2ab22342784775c9b8d3cc01102251df3380543e 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/HeapMqttMessage.java @@ -1,6 +1,7 @@ package io.github.quickmsg.common.message; import io.github.quickmsg.common.utils.JacksonUtil; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.util.internal.StringUtil; import lombok.AllArgsConstructor; import lombok.Builder; @@ -31,6 +32,8 @@ public class HeapMqttMessage { private byte[] message; + private MqttProperties properties; + public Map getKeyMap() { Map keys = new HashMap<>(5); diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java index d6a16539cc6c0dfaa5f0e63141210ed658e9ea7f..f3ef36a646bdead695c6d8c7d1b34266ebb2e84a 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/MqttMessageBuilder.java @@ -4,6 +4,13 @@ import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.*; import java.util.List; +import java.util.Map; + +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED_5; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE_5; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION; /** @@ -11,6 +18,32 @@ import java.util.List; */ public class MqttMessageBuilder { + private static MqttProperties genMqttProperties(Map userPropertiesMap) { + MqttProperties mqttProperties = null; + if (userPropertiesMap != null) { + mqttProperties = new MqttProperties(); + MqttProperties.UserProperties userProperties = new MqttProperties.UserProperties(); + for (Map.Entry entry : userPropertiesMap.entrySet()) { + userProperties.add(entry.getKey(), entry.getValue()); + } + mqttProperties.add(userProperties); + } + return mqttProperties; + } + + public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message, MqttProperties properties) { + MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0); + MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic, messageId, properties); + MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, message); + return mqttPublishMessage; + } + + public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message, Map userPropertiesMap) { + MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0); + MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(topic, messageId, genMqttProperties(userPropertiesMap)); + MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, message); + return mqttPublishMessage; + } public static MqttPublishMessage buildPub(boolean isDup, MqttQoS qoS, int messageId, String topic, ByteBuf message) { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qoS, false, 0); @@ -69,8 +102,35 @@ public class MqttMessageBuilder { return new MqttUnsubAckMessage(mqttFixedHeader, variableHeader); } - public static MqttConnAckMessage buildConnectAck(MqttConnectReturnCode connectReturnCode) { - MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(connectReturnCode, false); + public static MqttConnAckMessage buildConnectAck(MqttConnectReturnCode connectReturnCode, byte version) { + MqttProperties properties = MqttProperties.NO_PROPERTIES; + if (MqttVersion.MQTT_5.protocolLevel() == version) { + properties = new MqttProperties(); + // support retain msg + properties.add(new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.RETAIN_AVAILABLE.value(), 1)); + // don't support shared subscription + properties.add(new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.SHARED_SUBSCRIPTION_AVAILABLE.value(), 0)); + // mqtt3.0 error code transform + switch (connectReturnCode) { + case CONNECTION_REFUSED_IDENTIFIER_REJECTED: + connectReturnCode = CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID; + break; + case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION: + connectReturnCode = CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION; + break; + case CONNECTION_REFUSED_SERVER_UNAVAILABLE: + connectReturnCode = CONNECTION_REFUSED_SERVER_UNAVAILABLE_5; + break; + case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD: + connectReturnCode = CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD; + break; + case CONNECTION_REFUSED_NOT_AUTHORIZED: + connectReturnCode = CONNECTION_REFUSED_NOT_AUTHORIZED_5; + break; + + } + } + MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(connectReturnCode, false, properties); MqttFixedHeader mqttFixedHeader = new MqttFixedHeader( MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0X02); return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader); diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java index fbc623529cdcbc7e0f9fb110feb2f8899ebc30e4..2d67c21e50281155a26fec9f839a6bfa3c8929a7 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/RetainMessage.java @@ -1,8 +1,13 @@ package io.github.quickmsg.common.message; +import java.util.HashMap; +import java.util.Optional; + import io.github.quickmsg.common.channel.MqttChannel; +import io.github.quickmsg.common.utils.JacksonUtil; import io.github.quickmsg.common.utils.MessageUtils; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; @@ -22,12 +27,25 @@ public class RetainMessage { private byte[] body; + private String userProperties; + public static RetainMessage of(MqttPublishMessage mqttPublishMessage) { MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader(); return RetainMessage.builder() .topic(publishVariableHeader.topicName()) .qos(mqttPublishMessage.fixedHeader().qosLevel().value()) .body(MessageUtils.copyByteBuf(mqttPublishMessage.payload())) + .userProperties(JacksonUtil.map2Json(Optional.ofNullable(publishVariableHeader + .properties() + .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value())) + .map(list -> { + HashMap propertiesMap = new HashMap<>(list.size()); + list.forEach(property -> { + MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value(); + propertiesMap.put(pair.key, pair.value); + }); + return propertiesMap; + }).orElseGet(HashMap::new))) .build(); } @@ -37,7 +55,8 @@ public class RetainMessage { MqttQoS.valueOf(this.qos), qos > 0 ? mqttChannel.generateMessageId() : 0, topic, - PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body)); + PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body), + JacksonUtil.json2Map(userProperties, String.class, String.class)); } } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java b/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java index a3596af3aa8e4686cf1786d39598480c42efc2d8..f5aecea5ff339c029a868517f2e1d7fb8c243de8 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/message/SessionMessage.java @@ -1,8 +1,13 @@ package io.github.quickmsg.common.message; +import java.util.HashMap; +import java.util.Optional; + import io.github.quickmsg.common.channel.MqttChannel; +import io.github.quickmsg.common.utils.JacksonUtil; import io.github.quickmsg.common.utils.MessageUtils; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; @@ -27,6 +32,8 @@ public class SessionMessage { private boolean retain; + private String userProperties; + public static SessionMessage of(String clientIdentifier, MqttPublishMessage mqttPublishMessage) { MqttPublishVariableHeader publishVariableHeader = mqttPublishMessage.variableHeader(); return SessionMessage.builder() @@ -35,6 +42,17 @@ public class SessionMessage { .qos(mqttPublishMessage.fixedHeader().qosLevel().value()) .retain(mqttPublishMessage.fixedHeader().isRetain()) .body(MessageUtils.copyByteBuf(mqttPublishMessage.payload())) + .userProperties(JacksonUtil.map2Json(Optional.ofNullable(publishVariableHeader + .properties() + .getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value())) + .map(list -> { + HashMap propertiesMap = new HashMap<>(list.size()); + list.forEach(property -> { + MqttProperties.StringPair pair = (MqttProperties.StringPair) property.value(); + propertiesMap.put(pair.key, pair.value); + }); + return propertiesMap; + }).orElseGet(HashMap::new))) .build(); } @@ -44,7 +62,8 @@ public class SessionMessage { MqttQoS.valueOf(this.qos), qos > 0 ? mqttChannel.generateMessageId() : 0, topic, - PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body)); + PooledByteBufAllocator.DEFAULT.directBuffer().writeBytes(body), + JacksonUtil.json2Map(userProperties, String.class, String.class)); } } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java index fa197573b91107d755751afe9c9cf6c6e2877aa1..03c2c81eec62f7706ca618f51185e053285c5072 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/MessageUtils.java @@ -67,7 +67,7 @@ public class MessageUtils { MqttPublishVariableHeader mqttPublishVariableHeader = message.variableHeader(); MqttFixedHeader mqttFixedHeader = message.fixedHeader(); MqttFixedHeader newFixedHeader = new MqttFixedHeader(mqttFixedHeader.messageType(), false, mqttQoS, false, mqttFixedHeader.remainingLength()); - MqttPublishVariableHeader newHeader = new MqttPublishVariableHeader(mqttPublishVariableHeader.topicName(), messageId); + MqttPublishVariableHeader newHeader = new MqttPublishVariableHeader(mqttPublishVariableHeader.topicName(), messageId, mqttPublishVariableHeader.properties()); return new MqttPublishMessage(newFixedHeader, newHeader, message.payload().copy()); } diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java b/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java index d751f33ba3eb3b42e7f01106fdb31dc4cc4e4ec3..1830de766ffa61a3c925867780b5d15665703d2c 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/cluster/ClusterReceiver.java @@ -56,7 +56,8 @@ public class ClusterReceiver { MqttQoS.valueOf(heapMqttMessage.getQos()), 0, heapMqttMessage.getTopic(), - PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage())), System.currentTimeMillis(), Boolean.TRUE); + PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage()), + heapMqttMessage.getProperties()), System.currentTimeMillis(), Boolean.TRUE); } } diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java index 6e6bd47a320e2d820938b5e5f7a4f306e5c15a18..b8417ff386716ebfc13abb905e0e334ec22056b5 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java @@ -63,12 +63,13 @@ public class ConnectProtocol implements Protocol { ChannelRegistry channelRegistry = mqttReceiveContext.getChannelRegistry(); TopicRegistry topicRegistry = mqttReceiveContext.getTopicRegistry(); MetricManager metricManager = mqttReceiveContext.getMetricManager(); + byte mqttVersion = (byte) mqttConnectVariableHeader.version(); PasswordAuthentication passwordAuthentication = mqttReceiveContext.getPasswordAuthentication(); /*check clientIdentifier exist*/ if (mqttReceiveContext.getConfiguration().getConnectModel() == ConnectModel.UNIQUE) { if (channelRegistry.exists(clientIdentifier)) { return mqttChannel.write( - MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED), + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttVersion), false).then(mqttChannel.close()); } } else { @@ -78,16 +79,16 @@ public class ConnectProtocol implements Protocol { existMqttChannel.close().subscribe(); } else { return mqttChannel.write( - MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED), + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttVersion), false).then(mqttChannel.close()); } } } /*protocol version support*/ - if (MqttVersion.MQTT_3_1_1.protocolLevel() != (byte) mqttConnectVariableHeader.version() - && MqttVersion.MQTT_3_1.protocolLevel() != (byte) mqttConnectVariableHeader.version()) { + if (MqttVersion.MQTT_3_1_1.protocolLevel() != mqttVersion + && MqttVersion.MQTT_5.protocolLevel() != mqttVersion) { return mqttChannel.write( - MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION), + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, mqttVersion), false).then(mqttChannel.close()); } /*password check*/ @@ -147,11 +148,11 @@ public class ConnectProtocol implements Protocol { eventRegistry.registry(Event.CONNECT, mqttChannel, message, mqttReceiveContext); - return mqttChannel.write(MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_ACCEPTED), false) + return mqttChannel.write(MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttVersion), false) .then(Mono.fromRunnable(() -> sendOfflineMessage(mqttReceiveContext.getMessageRegistry(), mqttChannel))); } else { return mqttChannel.write( - MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD), + MqttMessageBuilder.buildConnectAck(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, mqttVersion), false).then(mqttChannel.close()); } } catch (Exception e) { diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java index f30dc0208e235749b77df0392bfca0e6fdbb4c5b..086c987049d3960ec93cd0f42d93a0d164b7b89d 100644 --- a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java +++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/RetainMessageEntity.java @@ -24,6 +24,8 @@ public class RetainMessageEntity implements Serializable { private byte[] body; + private String userProperties; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date createTime; diff --git a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java index 2966a12546e27774fa61ae8c630f5a11c8bbcd81..5a979bf46992e740061139a042d8d48e1224a0e8 100644 --- a/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java +++ b/smqtt-persistent/smqtt-persistent-redis/src/main/java/io/github/quickmsg/persistent/message/SessionMessageEntity.java @@ -25,6 +25,8 @@ public class SessionMessageEntity implements Serializable { private Boolean retain; + private String userProperties; + private byte[] body; @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date createTime; diff --git a/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java b/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java index 2b28c1e7300118dbe560a73bdef6a1f23472eddc..00519e79170ddbcb8d8455ed141e8f22d10323cd 100644 --- a/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java +++ b/smqtt-rule/smqtt-rule-engine/src/main/java/io/github/quickmsg/rule/node/TopicRuleNode.java @@ -58,7 +58,8 @@ public class TopicRuleNode implements RuleNode { MqttQoS.valueOf(heapMqttMessage.getQos()), 0, this.topic, - PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage())); + PooledByteBufAllocator.DEFAULT.buffer().writeBytes(heapMqttMessage.getMessage()), + heapMqttMessage.getProperties()); }