diff --git a/README.md b/README.md index f140b6c8751521360a0f3cef183ffe64026ab0d0..c43c1650dc06f9d63b6f7f40677f7801cc890011 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ SMQTT基于reactor-netty(spring-webflux底层依赖) 10. SPI接口扩展支持 - 消息管理接口(会话消息/保留消息管理) - 通道管理接口 (管理系统的客户端连接) - - 认证接口 (用于自定义外部认证) + - ~~认证接口 (用于自定义外部认证)~~ - 拦截器 (用户自定义拦截消息) 11. 集群支持(gossip协议实现) 12. 容器化支持 @@ -49,16 +49,25 @@ SMQTT基于reactor-netty(spring-webflux底层依赖) 17. grafana监控集成 - 支持influxdb - 支持prometheus -18. ACL权限控制 +18. ACL权限管理 + - 对设备、资访问授权 +19. 认证模块 + - 支持http + - 支持匿名 + - 支持固定密码 + - 支持sql + ## 尝试一下 > 大家不要恶意链接,谢谢! | 管理 | 说明 | 其他 | |----------------------------------------| ---- |---- | -| 113.90.145.99:18886 | mqtt端口 |用户名:smqtt 密码:smqtt | +| 113.90.145.99:18885 | mqtt端口 |用户名:smqtt 密码:smqtt | | 113.90.145.99:18888 | mqtt over websocket |用户名:smqtt 密码:smqtt | -| http://113.90.145.99:18887/smqtt/admin | 管理后台 |用户名:smqtt 密码:smqtt | +| http://113.90.145.99:60000/smqtt/admin | 管理后台 |用户名:smqtt 密码:smqtt | +## 商业化版本 +如果遇到接入性能问题,或者需要定制化开发的,我们提供商业化版本出售,请添加微信17512575402! ## 商业化 @@ -279,9 +288,6 @@ docker run -it -v <配置文件路径目录>:/conf -p 1883:1883 -p 1999:1999 1 ![image](icon/netty.png) -### 商业化 - -商业化版本出售,请添加微信17512575402! ## License diff --git a/config/acl/basic_policy.csv b/config/acl/basic_policy.csv new file mode 100644 index 0000000000000000000000000000000000000000..ca87dd9f23c79be67351b0a9077de269b6f46fc5 --- /dev/null +++ b/config/acl/basic_policy.csv @@ -0,0 +1,4 @@ +p, client01, topicA, PUBLISH,allow +p, client02, topicB, PUBLISH,deny +p, ip{192.168.0.172/24}, topicB, PUBLISH,deny +p, all, topicB, SUBSCRIBE,allow diff --git a/config/config.yaml b/config/config.yaml index e2ba214bf9f9a8837f050daae4c5ec7274f0b50c..c86325e62139b4fa9db5b18ae848e1ae4e7f57ef 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -5,9 +5,9 @@ smqtt: notKickSecond: 30 # KICK互踢模式生效, 单位秒, 指定时间内客户端不互踢, 避免客户端自动连接持续互踢 port: 1883 # mqtt端口号 wiretap: false # 二进制日志 前提是 smqtt.logLevel = DEBUG - bossThreadSize: 8 # boss线程 默认=cpu核心数 - workThreadSize: 16 # work线程 默认=cpu核心数*2 - businessThreadSize: 32 # 业务线程数 默认=cpu核心数*10 + bossThreadSize: 1 # boss线程 默认=1 + workThreadSize: 9 # work线程 默认=cpu核心数+1 + businessThreadSize: 8 # 业务线程数 默认=cpu核心数 businessQueueSize: 100000 #业务队列 默认=100000 messageMaxSize: 4194304 # 接收消息的最大限制 默认4194304(4M) lowWaterMark: 4000000 # 不建议配置 默认 32768 @@ -21,11 +21,11 @@ smqtt: key: /user/server.key # 指定ssl文件 默认系统生成 crt: /user/server.crt # 指定ssl文件 默认系统生成 acl: - aclPolicy: NONE # NONE or FILE or JDBC - filePath: # FILE时配置filePath + aclPolicy: JDBC # NONE or FILE or JDBC + filePath: D:\smqtt\config\acl\basic_policy.csv # FILE时配置filePath jdbcAclConfig: driver: com.mysql.jdbc.Driver - url: jdbc:mysql://localhost:3306/smqtt + url: jdbc:mysql://113.90.145.99:18889/smqtt username: root password: 123 http: # http相关配置 端口固定60000 @@ -37,6 +37,10 @@ smqtt: enable: true # 开关 username: smqtt # 访问用户名 password: smqtt # 访问密码 + auth: + fixed: + username: smqtt + password: smqtt ws: # websocket配置 enable: true # 开关 port: 8999 # 端口 diff --git a/monitor/prometheus/smqtt-application-1638608184756.json b/config/monitor/prometheus/smqtt-application-1638608184756.json similarity index 100% rename from monitor/prometheus/smqtt-application-1638608184756.json rename to config/monitor/prometheus/smqtt-application-1638608184756.json diff --git a/monitor/prometheus/smqtt-jvm-1638607233202.json b/config/monitor/prometheus/smqtt-jvm-1638607233202.json similarity index 100% rename from monitor/prometheus/smqtt-jvm-1638607233202.json rename to config/monitor/prometheus/smqtt-jvm-1638607233202.json diff --git a/monitor/prometheus/smqtt-netty-1638607270130.json b/config/monitor/prometheus/smqtt-netty-1638607270130.json similarity index 100% rename from monitor/prometheus/smqtt-netty-1638607270130.json rename to config/monitor/prometheus/smqtt-netty-1638607270130.json diff --git a/pom.xml b/pom.xml index 0f3d1b3870a43a381390da026572f498d810d08e..b3874941746f4ddd5d3223c3c4ab06f6f7577659 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 io.github.quickmsg smqtt - 1.1.3 + 1.1.4 smqtt-common smqtt-core diff --git a/smqtt-bootstrap/pom.xml b/smqtt-bootstrap/pom.xml index e5797431b6ff5648571fff2eb6a6f148bb0a5a25..f0d58b92f97558ac19a9bd372660b69cacc5e595 100644 --- a/smqtt-bootstrap/pom.xml +++ b/smqtt-bootstrap/pom.xml @@ -7,10 +7,10 @@ smqtt io.github.quickmsg - 1.1.3 + 1.1.4 smqtt-bootstrap - 1.1.3 + 1.1.4 smqtt-bootstrap http://www.example.com @@ -45,17 +45,17 @@ io.github.quickmsg smqtt-core - 1.1.3 + 1.1.4 smqtt-registry-scube io.github.quickmsg - 1.1.3 + 1.1.4 smqtt-ui io.github.quickmsg - 1.1.3 + 1.1.4 diff --git a/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java b/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java index e1078de376032b90283d81f5343d6797dbc26330..5c95e352dd87252e618fc8a851c8493820b41057 100644 --- a/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java +++ b/smqtt-bootstrap/src/main/java/io/github/quickmsg/AbstractStarter.java @@ -57,6 +57,7 @@ public abstract class AbstractStarter { .ruleChainDefinitions(config.getSmqttConfig().getRuleChainDefinitions()) .sourceDefinitions(config.getSmqttConfig().getRuleSources()) .aclConfig(config.getSmqttConfig().getAcl()) + .authConfig(config.getSmqttConfig().getAuthConfig()) .build() .doOnStarted(AbstractStarter::printUiUrl).startAwait(); diff --git a/smqtt-bootstrap/src/test/java/ClusterNode1.java b/smqtt-bootstrap/src/test/java/ClusterNode1.java index 398d2513ed976c7111128b2855e9e184f42d8230..a61473c0e45a41ad93eb3ed841e48a0be40ffffd 100644 --- a/smqtt-bootstrap/src/test/java/ClusterNode1.java +++ b/smqtt-bootstrap/src/test/java/ClusterNode1.java @@ -1,4 +1,5 @@ import ch.qos.logback.classic.Level; +import io.github.quickmsg.common.config.AuthConfig; import io.github.quickmsg.common.config.BootstrapConfig; import io.github.quickmsg.common.config.SslContext; import io.github.quickmsg.core.Bootstrap; diff --git a/smqtt-common/pom.xml b/smqtt-common/pom.xml index b4cd5cfc18821dc3891a9445283d92c090bd7d13..48321198b665d34b32553d36a48949f2030ef26e 100644 --- a/smqtt-common/pom.xml +++ b/smqtt-common/pom.xml @@ -5,7 +5,7 @@ smqtt io.github.quickmsg - 1.1.3 + 1.1.4 4.0.0 jar diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/acl/AclAction.java b/smqtt-common/src/main/java/io/github/quickmsg/common/acl/AclAction.java index 6ddab260e44fcf32066e9cb9d33629e5a0600c58..2a04a14be3bf48148703f76b4b5c842b4588a5ba 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/acl/AclAction.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/acl/AclAction.java @@ -4,10 +4,7 @@ package io.github.quickmsg.common.acl; * @author luxurong */ public enum AclAction { - /** - * mqtt connect - */ - CONNECT, + /** * mqtt sub */ @@ -16,5 +13,9 @@ public enum AclAction { /** * mqtt pub */ - PUBLISH + PUBLISH, + /** + * 新增ACL配置时, 一次性添加以上三种动作 + */ + ALL } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/acl/AclManager.java b/smqtt-common/src/main/java/io/github/quickmsg/common/acl/AclManager.java index 202cec0baeed91f9ff838eafff82e4edc0e3e714..9bb4979eb94ed097331ea898fefe706b1e00e930 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/acl/AclManager.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/acl/AclManager.java @@ -1,6 +1,7 @@ package io.github.quickmsg.common.acl; import io.github.quickmsg.common.acl.model.PolicyModel; +import io.github.quickmsg.common.channel.MqttChannel; import java.util.List; @@ -9,11 +10,13 @@ import java.util.List; */ public interface AclManager { - boolean auth(String sub, String source, AclAction action); + boolean check(MqttChannel mqttChannel, String source, AclAction action); - boolean add(String sub, String source, AclAction action); + boolean add(String sub,String source,AclAction action,AclType type); - boolean delete(String sub, String source, AclAction action); + + boolean delete(String sub,String source,AclAction action,AclType type); List> get(PolicyModel policyModel); + } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/acl/AclType.java b/smqtt-common/src/main/java/io/github/quickmsg/common/acl/AclType.java new file mode 100644 index 0000000000000000000000000000000000000000..a5cf84e83a03897a77bc486d7267fc7c897d65d5 --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/acl/AclType.java @@ -0,0 +1,35 @@ +package io.github.quickmsg.common.acl; + + +import lombok.Getter; + +/** + * @author luxurong + */ +public enum AclType { + + ALL(""), + + + ALLOW("allow"), + + DENY("deny"); + + @Getter + private final String desc; + + AclType(String desc) { + this.desc = desc; + } + + public static AclType fromDesc(String desc) { + for (AclType type : AclType.values()) { + if (type.desc.equals(desc)) { + return type; + } + } + return null; + } + + +} diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/acl/filter/AclFunction.java b/smqtt-common/src/main/java/io/github/quickmsg/common/acl/filter/AclFunction.java new file mode 100644 index 0000000000000000000000000000000000000000..6f41fddd4eeda804fd3363008822312b5defb959 --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/acl/filter/AclFunction.java @@ -0,0 +1,42 @@ +package io.github.quickmsg.common.acl.filter; + +import com.googlecode.aviator.runtime.function.FunctionUtils; +import com.googlecode.aviator.runtime.type.AviatorBoolean; +import com.googlecode.aviator.runtime.type.AviatorObject; +import org.casbin.jcasbin.util.BuiltInFunctions; +import org.casbin.jcasbin.util.function.CustomFunction; + +import java.util.Map; + +/** + * ip{} + * id{} + * all + * + * @author luxurong + */ +public class AclFunction extends CustomFunction { + + + @Override + public AviatorObject call(Map env, AviatorObject arg1, AviatorObject arg2) { + String requestSubject = FunctionUtils.getStringValue(arg1, env); + String subject = FunctionUtils.getStringValue(arg2, env); + if (subject.startsWith("ip")) { + int startIndex = subject.indexOf("{"); + int endIndex = subject.indexOf("}"); + String ip = requestSubject.split(":")[1]; + return AviatorBoolean.valueOf(BuiltInFunctions.ipMatch(ip, subject.substring(startIndex + 1, endIndex))); + } else if (subject.equals("all")) { + return AviatorBoolean.valueOf(true); + } else { + String clientId = requestSubject.split(":")[0]; + return AviatorBoolean.valueOf(clientId.equals(subject)); + } + } + + @Override + public String getName() { + return "filter"; + } +} diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/acl/model/PolicyModel.java b/smqtt-common/src/main/java/io/github/quickmsg/common/acl/model/PolicyModel.java index 19e79559129ddf55b3a04b23504a51282be1137c..117b3bacef866b4059a903708e62b7a2ff42b45b 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/acl/model/PolicyModel.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/acl/model/PolicyModel.java @@ -1,6 +1,7 @@ package io.github.quickmsg.common.acl.model; import io.github.quickmsg.common.acl.AclAction; +import io.github.quickmsg.common.acl.AclType; import lombok.Data; /** @@ -15,4 +16,6 @@ public class PolicyModel { private AclAction action; + private AclType aclType; + } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/auth/AuthBean.java b/smqtt-common/src/main/java/io/github/quickmsg/common/auth/AuthBean.java new file mode 100644 index 0000000000000000000000000000000000000000..1f8c858fd504b6d21ceb226839339aaf7f717e4f --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/auth/AuthBean.java @@ -0,0 +1,17 @@ +package io.github.quickmsg.common.auth; + +import lombok.Data; + +/** + * @author luxurong + */ +@Data +public class AuthBean { + + private String clientId; + + private String username; + + private String password; + +} diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/auth/PasswordAuthentication.java b/smqtt-common/src/main/java/io/github/quickmsg/common/auth/AuthManager.java similarity index 46% rename from smqtt-common/src/main/java/io/github/quickmsg/common/auth/PasswordAuthentication.java rename to smqtt-common/src/main/java/io/github/quickmsg/common/auth/AuthManager.java index fcd5fc31d55a86bbe6747690a442e9adc265b324..ea5e1be69b649a8a7c8e6752aa3d86f240eedb5e 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/auth/PasswordAuthentication.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/auth/AuthManager.java @@ -1,14 +1,9 @@ package io.github.quickmsg.common.auth; -import io.github.quickmsg.common.StartUp; -import io.github.quickmsg.common.spi.DynamicLoader; - /** * @author luxurong */ -public interface PasswordAuthentication extends StartUp { - - PasswordAuthentication INSTANCE = DynamicLoader.findFirst(PasswordAuthentication.class).orElse(null); +public interface AuthManager { /** * 认证接口 @@ -18,6 +13,6 @@ public interface PasswordAuthentication extends StartUp { * @param clientIdentifier 设备标志 * @return 布尔 */ - boolean auth(String userName, byte[] passwordInBytes, String clientIdentifier); + Boolean auth(String userName, byte[] passwordInBytes, String clientIdentifier); } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java index 2cecbdc178b4ecd8c28dfc854d5baf99b2536fcf..2ee7018baf8d5d27a6a3f038bfc3fb52d1ff7dbc 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/channel/MqttChannel.java @@ -1,9 +1,9 @@ package io.github.quickmsg.common.channel; import com.fasterxml.jackson.annotation.JsonIgnore; -import io.github.quickmsg.common.ack.Ack; -import io.github.quickmsg.common.ack.RetryAck; -import io.github.quickmsg.common.ack.TimeAckManager; +import io.github.quickmsg.common.retry.Ack; +import io.github.quickmsg.common.retry.RetryAck; +import io.github.quickmsg.common.retry.TimeAckManager; import io.github.quickmsg.common.enums.ChannelStatus; import io.github.quickmsg.common.topic.SubscribeTopic; import io.github.quickmsg.common.utils.MessageUtils; @@ -101,7 +101,8 @@ public class MqttChannel { mqttChannel.setQos2MsgCache(new ConcurrentHashMap<>()); mqttChannel.setConnection(connection); mqttChannel.setStatus(ChannelStatus.INIT); - mqttChannel.setAddress(connection.address().toString()); + mqttChannel.setAddress(connection.address().toString() + .replaceAll("/", "")); mqttChannel.setTimeAckManager(timeAckManager); return mqttChannel; } @@ -186,7 +187,7 @@ public class MqttChannel { public long generateId(MqttMessageType type, Integer messageId) { - return (long) connection.channel().hashCode() << 5 | (long) type.value() << 4 | messageId; + return (long) connection.channel().hashCode() << 32 | (long) type.value() << 28 | messageId<<4>>>4; } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/AbstractConfiguration.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/AbstractConfiguration.java index b73e39ee72c7175bfec3af50700d451a460e3f5e..23997a173b460a8fe898b94b7fad0f8efa4e7dbe 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/AbstractConfiguration.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/AbstractConfiguration.java @@ -1,6 +1,5 @@ package io.github.quickmsg.common.config; -import io.github.quickmsg.common.auth.PasswordAuthentication; import io.github.quickmsg.common.rule.RuleChainDefinition; import io.github.quickmsg.common.rule.RuleDefinition; import io.github.quickmsg.common.rule.source.SourceDefinition; @@ -51,5 +50,6 @@ public interface AbstractConfiguration extends Configuration { Map getEnvironmentMap(); + AuthConfig getAuthConfig(); } diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/AuthConfig.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/AuthConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..977b49c753907a5af76578fec6889054166fe1d1 --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/AuthConfig.java @@ -0,0 +1,62 @@ +package io.github.quickmsg.common.config; + +import lombok.Data; + +import java.util.Map; + +/** + * @author luxurong + */ +@Data +public class AuthConfig { + + private SqlAuthConfig sql; + + private HttpAuthConfig http; + + private FixedAuthConfig fixed; + + private String file; + + @Data + public static class FixedAuthConfig { + + private String username; + + private String password; + + } + + @Data + public static class HttpAuthConfig { + + private String host; + + private int port; + + private String path; + + private String method; + + private Map headers; + + private Map params; + + } + + + @Data + public static class SqlAuthConfig { + + private String driver; + + private String url; + + private String username; + + private String password; + + private String authSql; + + } +} diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java index ad2fe78bf51dc941625bf63d4ed07a0c5187c7d6..bcee19a1453dd5d11329cec8465276c69911d3ae 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/BootstrapConfig.java @@ -2,7 +2,6 @@ package io.github.quickmsg.common.config; import ch.qos.logback.classic.Level; import com.fasterxml.jackson.annotation.JsonProperty; -import io.github.quickmsg.common.auth.PasswordAuthentication; import io.github.quickmsg.common.metric.MeterType; import io.github.quickmsg.common.rule.RuleChainDefinition; import io.github.quickmsg.common.rule.source.SourceDefinition; @@ -109,6 +108,13 @@ public class BootstrapConfig { */ @JsonProperty("acl") private AclConfig acl; + + + /** + * auth配置 + */ + @JsonProperty("auth") + private AuthConfig authConfig; } @Data @@ -192,11 +198,6 @@ public class BootstrapConfig { */ Map childOptions; - /** - * PasswordAuthentication - */ - PasswordAuthentication authentication; - } @Data diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/config/SslContext.java b/smqtt-common/src/main/java/io/github/quickmsg/common/config/SslContext.java index af55fc6e823d951fe3a0cc73e3c318c4097a1b3c..d3492045182613d921d415daaed18e5e3751cf79 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/config/SslContext.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/config/SslContext.java @@ -1,5 +1,6 @@ package io.github.quickmsg.common.config; +import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @@ -8,14 +9,17 @@ import lombok.NoArgsConstructor; * @author luxurong */ @Data -@NoArgsConstructor @Builder +@NoArgsConstructor +@AllArgsConstructor public class SslContext { private String crt; private String key; + private String ca; + private Boolean enable; diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/context/ReceiveContext.java b/smqtt-common/src/main/java/io/github/quickmsg/common/context/ReceiveContext.java index 70046540adcc9aef580ff3ecacdfbc0a8158bfa9..81a7ace7a712ee6beb991a0060ff831f6925b96d 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/context/ReceiveContext.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/context/ReceiveContext.java @@ -1,6 +1,6 @@ package io.github.quickmsg.common.context; -import io.github.quickmsg.common.ack.TimeAckManager; +import io.github.quickmsg.common.retry.TimeAckManager; import io.github.quickmsg.common.acl.AclManager; import io.github.quickmsg.common.channel.ChannelRegistry; import io.github.quickmsg.common.channel.MqttChannel; diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/metric/MetricManager.java b/smqtt-common/src/main/java/io/github/quickmsg/common/metric/MetricManager.java index ea05e5940f775ed759bd005feda2e3e4bce067ef..4519c81e99925aa06634fcb33d6f55b58074c4e0 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/metric/MetricManager.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/metric/MetricManager.java @@ -38,7 +38,7 @@ public interface MetricManager { MemoryMXBean mxb = ManagementFactory.getMemoryMXBean(); ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); RuntimeMXBean runtimeBean = ManagementFactory.getRuntimeMXBean(); - metrics.put("smqtt", "1.1.1"); + metrics.put("smqtt", "1.1.4"); metrics.put("start_time", sdf.format(new Date(runtimeBean.getStartTime()))); metrics.put("jdk_home", props.getProperty("java.home")); metrics.put("jdk_version", props.getProperty("java.version")); diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java b/smqtt-common/src/main/java/io/github/quickmsg/common/retry/AbsAck.java similarity index 95% rename from smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java rename to smqtt-common/src/main/java/io/github/quickmsg/common/retry/AbsAck.java index 8ce52990f410c33513ce3db9593b28ca7cf78e82..706690bdcfb4be23dcb13a0ae52b91a0b1acbb2f 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AbsAck.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/retry/AbsAck.java @@ -1,10 +1,9 @@ -package io.github.quickmsg.common.ack; +package io.github.quickmsg.common.retry; import io.netty.util.Timeout; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; /** * @author luxurong diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/Ack.java b/smqtt-common/src/main/java/io/github/quickmsg/common/retry/Ack.java similarity index 86% rename from smqtt-common/src/main/java/io/github/quickmsg/common/ack/Ack.java rename to smqtt-common/src/main/java/io/github/quickmsg/common/retry/Ack.java index 822157c091bf697c3a31a2f9bd90d19e221bef96..26fba301b7c42a421b2f262a08e0617ec5612b3d 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/Ack.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/retry/Ack.java @@ -1,4 +1,4 @@ -package io.github.quickmsg.common.ack; +package io.github.quickmsg.common.retry; import io.netty.util.TimerTask; diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AckManager.java b/smqtt-common/src/main/java/io/github/quickmsg/common/retry/AckManager.java similarity index 78% rename from smqtt-common/src/main/java/io/github/quickmsg/common/ack/AckManager.java rename to smqtt-common/src/main/java/io/github/quickmsg/common/retry/AckManager.java index 12a5f956f229c7c8520022cd8549ec1b9935b419..1135cbabc413a7a3a7cc7cb8d3d3589b0e4a8eed 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/AckManager.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/retry/AckManager.java @@ -1,4 +1,4 @@ -package io.github.quickmsg.common.ack; +package io.github.quickmsg.common.retry; /** * @author luxurong diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java b/smqtt-common/src/main/java/io/github/quickmsg/common/retry/RetryAck.java similarity index 90% rename from smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java rename to smqtt-common/src/main/java/io/github/quickmsg/common/retry/RetryAck.java index ef4ca1c802e43ca61b86eb5ffeb50733afea3acc..86ccf1e0433cbc8c146d9c7da256c42dd69198a7 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/RetryAck.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/retry/RetryAck.java @@ -1,4 +1,4 @@ -package io.github.quickmsg.common.ack; +package io.github.quickmsg.common.retry; /** * @author luxurong diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/TimeAckManager.java b/smqtt-common/src/main/java/io/github/quickmsg/common/retry/TimeAckManager.java similarity index 95% rename from smqtt-common/src/main/java/io/github/quickmsg/common/ack/TimeAckManager.java rename to smqtt-common/src/main/java/io/github/quickmsg/common/retry/TimeAckManager.java index c1739a5d667f86c1eb0ee07d54fe0910220e0dcb..cea9942ad252e100af2703f4fc30974559fab30b 100644 --- a/smqtt-common/src/main/java/io/github/quickmsg/common/ack/TimeAckManager.java +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/retry/TimeAckManager.java @@ -1,4 +1,4 @@ -package io.github.quickmsg.common.ack; +package io.github.quickmsg.common.retry; import io.netty.util.HashedWheelTimer; diff --git a/smqtt-common/src/main/java/io/github/quickmsg/common/utils/CsvReader.java b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/CsvReader.java new file mode 100644 index 0000000000000000000000000000000000000000..6d4fcd55b8da08d1e6083acff8003496d1d2829c --- /dev/null +++ b/smqtt-common/src/main/java/io/github/quickmsg/common/utils/CsvReader.java @@ -0,0 +1,48 @@ +package io.github.quickmsg.common.utils; + +import lombok.extern.slf4j.Slf4j; + +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * @author luxurong + */ +@Slf4j +public class CsvReader { + + public static List> readCsvValues(String filePath) { + File csv = new File(filePath); + BufferedReader br = null; + try { + br = new BufferedReader(new InputStreamReader(new FileInputStream(csv), StandardCharsets.UTF_8)); + } catch (Exception e) { + e.printStackTrace(); + } + if (br != null) { + String line = ""; + List> records = new ArrayList<>(); + try { + while ((line = br.readLine()) != null) { + List lines=buildLineList(line); + records.add(lines); + } + return records; + } catch (IOException e) { + log.error("read auth error"); + e.printStackTrace(); + } + } + return Collections.emptyList(); + } + + private static List buildLineList(String line) { + return Arrays.stream(line.split(",")) + .collect(Collectors.toList()); + } +} diff --git a/smqtt-core/pom.xml b/smqtt-core/pom.xml index 1e999a8770b091a8d20bb1bcb4040ba8039d7bb6..22f6b5a7cd8409c9140f66d87dd7502aadaaf962 100644 --- a/smqtt-core/pom.xml +++ b/smqtt-core/pom.xml @@ -5,7 +5,7 @@ smqtt io.github.quickmsg - 1.1.3 + 1.1.4 4.0.0 smqtt-core @@ -14,22 +14,22 @@ io.github.quickmsg smqtt-common - 1.1.3 + 1.1.4 io.github.quickmsg smqtt-rule-dsl - 1.1.3 + 1.1.4 io.github.quickmsg smqtt-metric-influxdb - 1.1.3 + 1.1.4 io.github.quickmsg smqtt-metric-prometheus - 1.1.3 + 1.1.4 diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java index 2513c5177339ff0c03cc5a27313b8d74743e84e8..ac2f762cb3e811754e19f928accc7d488ca444eb 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/Bootstrap.java @@ -2,6 +2,7 @@ package io.github.quickmsg.core; import ch.qos.logback.classic.Level; import io.github.quickmsg.common.config.AclConfig; +import io.github.quickmsg.common.config.AuthConfig; import io.github.quickmsg.common.config.BootstrapConfig; import io.github.quickmsg.common.config.SslContext; import io.github.quickmsg.common.rule.RuleChainDefinition; @@ -56,6 +57,8 @@ public class Bootstrap { private AclConfig aclConfig; + private AuthConfig authConfig; + private final List> transports = new ArrayList<>(); @Builder.Default @@ -89,6 +92,7 @@ public class Bootstrap { Optional.ofNullable(clusterConfig).ifPresent(mqttConfiguration::setClusterConfig); Optional.ofNullable(meterConfig).ifPresent(mqttConfiguration::setMeterConfig); Optional.ofNullable(aclConfig).ifPresent(mqttConfiguration::setAclConfig); + Optional.ofNullable(authConfig).ifPresent(mqttConfiguration::setAuthConfig); if (websocketConfig != null && websocketConfig.isEnable()) { mqttConfiguration.setWebSocketPort(websocketConfig.getPort()); diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/acl/JCasBinAclManager.java b/smqtt-core/src/main/java/io/github/quickmsg/core/acl/JCasBinAclManager.java index b66d9c0b789304dba4cdcd7a63718cd73a7a9867..a7992c628062f70eea9c16c771762c1b338e1f99 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/acl/JCasBinAclManager.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/acl/JCasBinAclManager.java @@ -3,18 +3,21 @@ package io.github.quickmsg.core.acl; import io.github.quickmsg.common.acl.AclAction; import io.github.quickmsg.common.acl.AclManager; import io.github.quickmsg.common.acl.AclPolicy; -import io.github.quickmsg.common.config.AclConfig; +import io.github.quickmsg.common.acl.AclType; +import io.github.quickmsg.common.acl.filter.AclFunction; import io.github.quickmsg.common.acl.model.PolicyModel; +import io.github.quickmsg.common.channel.MqttChannel; +import io.github.quickmsg.common.config.AclConfig; +import io.github.quickmsg.common.utils.TopicRegexUtils; import lombok.extern.slf4j.Slf4j; import org.casbin.adapter.JDBCAdapter; import org.casbin.jcasbin.main.Enforcer; import org.casbin.jcasbin.model.Model; import org.casbin.jcasbin.persist.file_adapter.FileAdapter; +import org.casbin.jcasbin.util.BuiltInFunctions; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.Optional; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; /** * @author luxurong @@ -24,53 +27,86 @@ public class JCasBinAclManager implements AclManager { private Enforcer enforcer; + private final Map> filterAclTopicActions = new ConcurrentHashMap<>(); + + private final String REQUEST_SUBJECT_TEMPLATE = "%s:%s"; + public JCasBinAclManager(AclConfig aclConfig) { if (aclConfig != null) { Model model = new Model(); model.addDef("r", "r", "sub, obj, act"); - model.addDef("p", "p", "sub, obj, act"); - model.addDef("e", "e", "some(where (p.eft == allow))"); - model.addDef("m", "m", "r.sub == p.sub && r.obj == p.obj && r.act == p.act"); + model.addDef("p", "p", " sub, obj, act, eft"); + model.addDef("g", "g", "_, _"); + model.addDef("e", "e", "some(where (p.eft == allow)) && !some(where (p.eft == deny))"); + model.addDef("m", "m", "r.act == p.act && keyMatch2(r.obj,p.obj) && filter(r.sub, p.sub)"); if (aclConfig.getAclPolicy() == AclPolicy.JDBC) { AclConfig.JdbcAclConfig jdbcAclConfig = aclConfig.getJdbcAclConfig(); Objects.requireNonNull(jdbcAclConfig); try { - enforcer = new Enforcer(model, new JDBCAdapter(jdbcAclConfig.getDriver(), jdbcAclConfig.getUrl(), jdbcAclConfig.getUsername(), jdbcAclConfig.getPassword())); + enforcer = new Enforcer(model, new JDBCAdapter(jdbcAclConfig.getDriver(), jdbcAclConfig.getUrl(), + jdbcAclConfig.getUsername(), jdbcAclConfig.getPassword())); } catch (Exception e) { log.error("init acl jdbc error {}", aclConfig, e); } } else if (aclConfig.getAclPolicy() == AclPolicy.FILE) { enforcer = new Enforcer(model, new FileAdapter(aclConfig.getFilePath())); + } else { + enforcer = new Enforcer(); + } + enforcer.addFunction("filter", new AclFunction()); + List objects = enforcer.getAllObjects(); + List actions = enforcer.getAllActions(); + for (int i = 0; i < objects.size(); i++) { + Set allObjects = filterAclTopicActions.computeIfAbsent(actions.get(i), a -> new HashSet<>()); + allObjects.add(objects.get(i)); } } } @Override - public boolean auth(String sub, String source, AclAction action) { - return Optional.ofNullable(enforcer) - .map(ef -> enforcer.enforce(sub, source, action.name())) - .orElse(true); + public boolean check(MqttChannel mqttChannel, String source, AclAction action) { + try { + boolean isCheckAcl = Optional.ofNullable(filterAclTopicActions.get(action.name())) + .map(objects -> objects.stream().anyMatch(topic->BuiltInFunctions.keyMatch2(source,topic))) + .orElse(false); + if (isCheckAcl) { + String subject = String.format(REQUEST_SUBJECT_TEMPLATE, mqttChannel.getClientIdentifier() + , mqttChannel.getAddress().split(":")[0]); + return Optional.ofNullable(enforcer) + .map(ef -> ef.enforce(subject, source, action.name())) + .orElse(true); + } + + } catch (Exception e) { + log.error("acl check error",e); + } + return true; } @Override - public boolean add(String sub, String source, AclAction action) { + public boolean add(String sub, String source, AclAction action, AclType type) { return Optional.ofNullable(enforcer) - .map(ef -> enforcer.addNamedPolicy("p", sub, source, action.name())) + .map(ef -> enforcer.addNamedPolicy("p", sub, source, action.name(),type.getDesc())) .orElse(true); } @Override - public boolean delete(String sub, String source, AclAction action) { + public boolean delete(String sub, String source, AclAction action,AclType type) { return Optional.ofNullable(enforcer) - .map(ef -> enforcer.removeNamedPolicy("p", sub, source, action.name())) + .map(ef -> enforcer.removeNamedPolicy("p", sub, source, action.name(),type.getDesc())) .orElse(true); } @Override public List> get(PolicyModel policyModel) { return Optional.ofNullable(enforcer) - .map(ef -> enforcer.getFilteredNamedPolicy("p", 0, policyModel.getSubject(), policyModel.getSource(), policyModel.getAction() == null ? "" : policyModel.getAction().name())) + .map(ef -> enforcer + .getFilteredNamedPolicy("p", 0, + policyModel.getSubject(), policyModel.getSource(), + policyModel.getAction() == null || AclAction.ALL == policyModel.getAction() ? "" : policyModel.getAction().name(), + policyModel.getAclType()==null || AclType.ALL == policyModel.getAclType() ?"":policyModel.getAclType().getDesc()) + ) .orElse(Collections.emptyList()); } diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/auth/AuthManagerFactory.java b/smqtt-core/src/main/java/io/github/quickmsg/core/auth/AuthManagerFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..38146bd3e2e9a926141c7f8741f15f9f684323c2 --- /dev/null +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/auth/AuthManagerFactory.java @@ -0,0 +1,33 @@ +package io.github.quickmsg.core.auth; + +import io.github.quickmsg.common.auth.AuthManager; +import io.github.quickmsg.common.config.AuthConfig; + +/** + * @author luxurong + */ +public class AuthManagerFactory { + + private final AuthConfig authConfig; + + public AuthManagerFactory(AuthConfig authConfig) { + this.authConfig = authConfig; + } + + public AuthManager getAuthManager() { + if (authConfig == null) { + return new NoneAuthManager(); + } + if (authConfig.getHttp() != null) { + return new HttpAuthManager(authConfig); + } else if (authConfig.getFile() != null) { + return new FileAuthManager(authConfig); + } else if (authConfig.getFixed() != null) { + return new FixedAuthManager(authConfig); + } else if (authConfig.getSql() != null) { + return new SqlAuthManager(authConfig); + } else { + return new NoneAuthManager(); + } + } +} \ No newline at end of file diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/auth/FileAuthManager.java b/smqtt-core/src/main/java/io/github/quickmsg/core/auth/FileAuthManager.java new file mode 100644 index 0000000000000000000000000000000000000000..f93a7949c4fc73f261b2282bd8d0314ecd2e5eeb --- /dev/null +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/auth/FileAuthManager.java @@ -0,0 +1,42 @@ +package io.github.quickmsg.core.auth; + +import io.github.quickmsg.common.auth.AuthBean; +import io.github.quickmsg.common.auth.AuthManager; +import io.github.quickmsg.common.config.AuthConfig; +import io.github.quickmsg.common.utils.CsvReader; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * @author luxurong + */ +public class FileAuthManager implements AuthManager { + + private final AuthConfig authConfig; + + private Map authBeanMap = new HashMap<>(); + + public FileAuthManager(AuthConfig authConfig) { + this.authConfig = authConfig; + List> values = CsvReader.readCsvValues(authConfig.getFile()); + for (List es : values) { + AuthBean authBean = new AuthBean(); + authBean.setClientId(es.get(0)); + authBean.setUsername(es.get(1)); + authBean.setPassword(es.get(2)); + authBeanMap.put(authBean.getClientId(),authBean); + } + } + + @Override + public Boolean auth(String userName, byte[] passwordInBytes, String clientIdentifier) { + return Optional.ofNullable(authBeanMap.get(clientIdentifier)) + .map(authBean -> authBean.getUsername().equals(userName) && authBean.getPassword().equals(new String(passwordInBytes))) + .orElse(false); + } + + +} diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/auth/FixedAuthManager.java b/smqtt-core/src/main/java/io/github/quickmsg/core/auth/FixedAuthManager.java new file mode 100644 index 0000000000000000000000000000000000000000..fcb00635dbb31201594ab99f1e388e1caa52c682 --- /dev/null +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/auth/FixedAuthManager.java @@ -0,0 +1,22 @@ +package io.github.quickmsg.core.auth; + +import io.github.quickmsg.common.auth.AuthManager; +import io.github.quickmsg.common.config.AuthConfig; + +/** + * @author luxurong + */ +public class FixedAuthManager implements AuthManager { + + private final AuthConfig authConfig; + + public FixedAuthManager(AuthConfig authConfig) { + this.authConfig = authConfig; + } + + @Override + public Boolean auth(String userName, byte[] passwordInBytes, String clientIdentifier) { + return authConfig.getFixed().getUsername().equals(userName) + && authConfig.getFixed().getPassword().equals(new String(passwordInBytes)); + } +} diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/auth/HttpAuthManager.java b/smqtt-core/src/main/java/io/github/quickmsg/core/auth/HttpAuthManager.java new file mode 100644 index 0000000000000000000000000000000000000000..29676dbe8ca506fd817cdcbb64bf5de1cf4b0e9f --- /dev/null +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/auth/HttpAuthManager.java @@ -0,0 +1,52 @@ +package io.github.quickmsg.core.auth; + +import io.github.quickmsg.common.auth.AuthManager; +import io.github.quickmsg.common.config.AuthConfig; +import io.github.quickmsg.common.utils.JacksonUtil; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponseStatus; +import reactor.core.publisher.Mono; +import reactor.netty.ByteBufFlux; +import reactor.netty.http.client.HttpClient; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * @author luxurong + */ +public class HttpAuthManager implements AuthManager { + + private final AuthConfig authConfig; + + private final HttpClient client; + + public HttpAuthManager(AuthConfig authConfig) { + this.authConfig = authConfig; + AuthConfig.HttpAuthConfig httpAuthConfig = authConfig.getHttp(); + this.client = HttpClient.create().host(httpAuthConfig.getHost()).port(httpAuthConfig.getPort()) + .headers(headers -> { + headers.add(HttpHeaderNames.CONTENT_TYPE.toString(), "application/json;utf-8"); + Optional.ofNullable(httpAuthConfig.getHeaders()) + .ifPresent(addHeaders -> addHeaders.forEach(headers::add)); + }); + } + @Override + public Boolean auth(String userName, byte[] passwordInBytes, String clientIdentifier) { + AuthConfig.HttpAuthConfig httpAuthConfig = authConfig.getHttp(); + Map params = new HashMap<>(); + params.put("clientIdentifier", clientIdentifier); + params.put("username", userName); + params.put("password", new String(passwordInBytes, StandardCharsets.UTF_8)); + params.putAll(httpAuthConfig.getParams()); + return client.post().uri(httpAuthConfig.getPath()) + .send(ByteBufFlux.fromString(Mono.just(JacksonUtil.map2Json(params)))) + .response() + .map(response -> HttpResponseStatus.OK == response.status()) + .block(Duration.ofSeconds(3)); + } + +} diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/auth/NoneAuthManager.java b/smqtt-core/src/main/java/io/github/quickmsg/core/auth/NoneAuthManager.java new file mode 100644 index 0000000000000000000000000000000000000000..bdf6a6b4006694f7e12bca7a77d0dd4b1a449082 --- /dev/null +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/auth/NoneAuthManager.java @@ -0,0 +1,13 @@ +package io.github.quickmsg.core.auth; + +import io.github.quickmsg.common.auth.AuthManager; + +/** + * @author luxurong + */ +public class NoneAuthManager implements AuthManager { + @Override + public Boolean auth(String userName, byte[] passwordInBytes, String clientIdentifier) { + return true; + } +} diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/auth/SqlAuthManager.java b/smqtt-core/src/main/java/io/github/quickmsg/core/auth/SqlAuthManager.java new file mode 100644 index 0000000000000000000000000000000000000000..02e065e125e357fbafec6cd47336886fce3cae2d --- /dev/null +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/auth/SqlAuthManager.java @@ -0,0 +1,71 @@ +package io.github.quickmsg.core.auth; + +import io.github.quickmsg.common.auth.AuthManager; +import io.github.quickmsg.common.config.AuthConfig; +import io.github.quickmsg.source.db.config.HikariCPConnectionProvider; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Properties; + +/** + * @author luxurong + */ +@Slf4j +public class SqlAuthManager implements AuthManager { + + private AuthConfig authConfig; + + public SqlAuthManager(AuthConfig authConfig) { + this.authConfig = authConfig; + // 初始化数据库连接池 + Properties properties = new Properties(); + properties.put("jdbcUrl", authConfig.getSql().getUrl()); + properties.put("username", authConfig.getSql().getUsername()); + properties.put("password", authConfig.getSql().getPassword()); + + HikariCPConnectionProvider + .singleTon() + .init(properties); + } + + public Boolean auth(String userName, byte[] passwordInBytes, String clientIdentifier) { + Connection conn = null; + PreparedStatement ps = null; + ResultSet rs = null; + try { + conn = HikariCPConnectionProvider.singleTon().getConnection(); + ps = conn.prepareStatement(authConfig.getSql().getAuthSql()); + ps.setString(1, userName); + ps.setString(2, new String(passwordInBytes, StandardCharsets.UTF_8)); + ps.setString(3, clientIdentifier); + + rs = ps.executeQuery(); + if (rs.next()) { + return true; + } + } catch (SQLException e) { + log.error("auth error clientIdentifier={}", clientIdentifier, e); + } finally { + try { + if (rs != null) { + rs.close(); + } + if (ps != null) { + ps.close(); + } + if (conn != null) { + conn.close(); + } + } catch (SQLException e) { + log.error("close error clientIdentifier={}", clientIdentifier, e); + } + } + + return false; + } +} diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/http/acl/AclAddPolicyActor.java b/smqtt-core/src/main/java/io/github/quickmsg/core/http/acl/AclAddPolicyActor.java index a3ed5664aaf237b3de57d7dd5fd6bb2d1df8e681..f8fa40bb5b43d691879782f59e62fd4aedccd659 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/http/acl/AclAddPolicyActor.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/http/acl/AclAddPolicyActor.java @@ -1,5 +1,7 @@ package io.github.quickmsg.core.http.acl; +import io.github.quickmsg.common.acl.AclAction; +import io.github.quickmsg.common.acl.model.PolicyModel; import io.github.quickmsg.common.annotation.AllowCors; import io.github.quickmsg.common.annotation.Header; import io.github.quickmsg.common.annotation.Router; @@ -7,7 +9,6 @@ import io.github.quickmsg.common.config.Configuration; import io.github.quickmsg.common.context.ContextHolder; import io.github.quickmsg.common.enums.HttpType; import io.github.quickmsg.core.http.AbstractHttpActor; -import io.github.quickmsg.common.acl.model.PolicyModel; import lombok.extern.slf4j.Slf4j; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; @@ -32,8 +33,15 @@ public class AclAddPolicyActor extends AbstractHttpActor { .receive() .asString(StandardCharsets.UTF_8) .map(this.toJson(PolicyModel.class)) - .doOnNext(policyModel -> - ContextHolder.getReceiveContext().getAclManager().add(policyModel.getSubject(), policyModel.getSource(), policyModel.getAction()) + .doOnNext(policyModel -> { + if (policyModel.getAction() == AclAction.ALL) { + ContextHolder.getReceiveContext().getAclManager().add + (policyModel.getSubject(), policyModel.getSource(), AclAction.SUBSCRIBE,policyModel.getAclType()); + ContextHolder.getReceiveContext().getAclManager().add(policyModel.getSubject(), policyModel.getSource(), AclAction.PUBLISH,policyModel.getAclType()); + } else { + ContextHolder.getReceiveContext().getAclManager().add(policyModel.getSubject(), policyModel.getSource(), policyModel.getAction(),policyModel.getAclType()); + } + } ).then(response.sendString(Mono.just("success")).then()); } } diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/http/acl/AclDeletePolicyActor.java b/smqtt-core/src/main/java/io/github/quickmsg/core/http/acl/AclDeletePolicyActor.java index 40c2ad2187c8136324672123f3ae4c43e7dfa581..1fe8112fd155efa64fff5fb82e5df57967005d6f 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/http/acl/AclDeletePolicyActor.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/http/acl/AclDeletePolicyActor.java @@ -1,5 +1,6 @@ package io.github.quickmsg.core.http.acl; +import io.github.quickmsg.common.acl.model.PolicyModel; import io.github.quickmsg.common.annotation.AllowCors; import io.github.quickmsg.common.annotation.Header; import io.github.quickmsg.common.annotation.Router; @@ -7,7 +8,6 @@ import io.github.quickmsg.common.config.Configuration; import io.github.quickmsg.common.context.ContextHolder; import io.github.quickmsg.common.enums.HttpType; import io.github.quickmsg.core.http.AbstractHttpActor; -import io.github.quickmsg.common.acl.model.PolicyModel; import lombok.extern.slf4j.Slf4j; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; @@ -33,7 +33,7 @@ public class AclDeletePolicyActor extends AbstractHttpActor { .asString(StandardCharsets.UTF_8) .map(this.toJson(PolicyModel.class)) .doOnNext(policyModel -> - ContextHolder.getReceiveContext().getAclManager().delete(policyModel.getSubject(), policyModel.getSource(), policyModel.getAction()) + ContextHolder.getReceiveContext().getAclManager().delete(policyModel.getSubject(), policyModel.getSource(), policyModel.getAction(),policyModel.getAclType()) ).then(response.sendString(Mono.just("success")).then()); } } diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/http/acl/AclQueryPolicyActor.java b/smqtt-core/src/main/java/io/github/quickmsg/core/http/acl/AclQueryPolicyActor.java index 0fe94f457f4c8a003b025bec2c22325fe7997858..52b544a33b1260adcd0d162dfefb7b67c010a752 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/http/acl/AclQueryPolicyActor.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/http/acl/AclQueryPolicyActor.java @@ -1,5 +1,6 @@ package io.github.quickmsg.core.http.acl; +import io.github.quickmsg.common.acl.AclType; import io.github.quickmsg.common.acl.model.PolicyModel; import io.github.quickmsg.common.annotation.AllowCors; import io.github.quickmsg.common.annotation.Header; @@ -39,6 +40,7 @@ public class AclQueryPolicyActor extends AbstractHttpActor { map.put("subject", item.size() >= 3 ? item.get(0) : null); map.put("source", item.size() >= 3 ? item.get(1) : null); map.put("action", item.size() >= 3 ? item.get(2) : null); + map.put("aclType", item.size() >= 4 ? AclType.fromDesc(item.get(3)).name() : null); return map; }).collect(Collectors.toList()); response.sendString(Mono.just(JacksonUtil.bean2Json(collect))).then().subscribe(); diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/http/actors/CloseConnectionActor.java b/smqtt-core/src/main/java/io/github/quickmsg/core/http/actors/CloseConnectionActor.java new file mode 100644 index 0000000000000000000000000000000000000000..d86ab71395b3cf15f84975d828eaef3eb2b81f66 --- /dev/null +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/http/actors/CloseConnectionActor.java @@ -0,0 +1,66 @@ +package io.github.quickmsg.core.http.actors; + +import io.github.quickmsg.common.annotation.AllowCors; +import io.github.quickmsg.common.annotation.Header; +import io.github.quickmsg.common.annotation.Router; +import io.github.quickmsg.common.channel.MqttChannel; +import io.github.quickmsg.common.config.Configuration; +import io.github.quickmsg.common.context.ContextHolder; +import io.github.quickmsg.common.enums.HttpType; +import io.github.quickmsg.common.utils.JacksonUtil; +import io.github.quickmsg.core.http.AbstractHttpActor; +import io.github.quickmsg.core.http.HttpConfiguration; +import io.github.quickmsg.core.http.model.LoginDo; +import io.github.quickmsg.core.http.model.LoginVm; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; +import reactor.netty.http.server.HttpServerRequest; +import reactor.netty.http.server.HttpServerResponse; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * @author luxurong + */ +@Router(value = "/smqtt/close/connection", type = HttpType.POST) +@Slf4j +@Header(key = "Content-Type", value = "application/json") +@AllowCors +public class CloseConnectionActor extends AbstractHttpActor { + + @Override + public Publisher doRequest(HttpServerRequest request, HttpServerResponse response, Configuration httpConfiguration) { + return request + .receive() + .asString(StandardCharsets.UTF_8) + .map(this.toJson(Close.class)) + .doOnNext(close -> { + if(CollectionUtils.isNotEmpty(close.getIds())){ + close.getIds().forEach(id->{ + MqttChannel mqttChannel=ContextHolder.getReceiveContext() + .getChannelRegistry() + .get(id); + if(mqttChannel!=null){ + mqttChannel.close().subscribe(); + } + }); + } + }).then(); + } + + + @Data + public static class Close{ + private List ids; + } + + + +} diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/http/actors/ConnectionActor.java b/smqtt-core/src/main/java/io/github/quickmsg/core/http/actors/ConnectionActor.java index bd53950082cb200bb6b5ed902a0d4585673391f7..e53deb889106020f1f9a5917f42214ff44cef24e 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/http/actors/ConnectionActor.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/http/actors/ConnectionActor.java @@ -35,7 +35,7 @@ public class ConnectionActor extends AbstractHttpActor { ContextHolder.getReceiveContext().getChannelRegistry().getChannels() .stream() .map(record -> { - record.setAddress(record.getAddress().replaceAll("/", "")); + record.setAddress(record.getAddress()); return record; }).collect(Collectors.toList()) ))) diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java index 60c7991bab5f42507ba41f5bd43c3582eaeb62ef..0cc9a6e90565b4e612d4f4203a327f881d68bda4 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/AbstractReceiveContext.java @@ -1,15 +1,12 @@ package io.github.quickmsg.core.mqtt; -import io.github.quickmsg.common.ack.TimeAckManager; +import io.github.quickmsg.common.retry.TimeAckManager; import io.github.quickmsg.common.acl.AclManager; -import io.github.quickmsg.common.auth.PasswordAuthentication; +import io.github.quickmsg.common.auth.AuthManager; import io.github.quickmsg.common.channel.ChannelRegistry; import io.github.quickmsg.common.channel.traffic.TrafficHandlerLoader; import io.github.quickmsg.common.cluster.ClusterRegistry; -import io.github.quickmsg.common.config.AbstractConfiguration; -import io.github.quickmsg.common.config.BootstrapConfig; -import io.github.quickmsg.common.config.ConfigCheck; -import io.github.quickmsg.common.config.Configuration; +import io.github.quickmsg.common.config.*; import io.github.quickmsg.common.context.ReceiveContext; import io.github.quickmsg.common.enums.Event; import io.github.quickmsg.common.message.EventRegistry; @@ -22,6 +19,7 @@ import io.github.quickmsg.common.rule.DslExecutor; import io.github.quickmsg.common.topic.TopicRegistry; import io.github.quickmsg.common.transport.Transport; import io.github.quickmsg.core.acl.JCasBinAclManager; +import io.github.quickmsg.core.auth.AuthManagerFactory; import io.github.quickmsg.core.cluster.InJvmClusterRegistry; import io.github.quickmsg.core.mqtt.traffic.CacheTrafficHandlerLoader; import io.github.quickmsg.core.mqtt.traffic.LazyTrafficHandlerLoader; @@ -80,6 +78,8 @@ public abstract class AbstractReceiveContext implements private final AclManager aclManager; + private final AuthManager authManager; + public AbstractReceiveContext(T configuration, Transport transport) { AbstractConfiguration abstractConfiguration = castConfiguration(configuration); RuleDslParser ruleDslParser = new RuleDslParser(abstractConfiguration.getRuleChainDefinitions()); @@ -98,6 +98,7 @@ public abstract class AbstractReceiveContext implements this.messageRegistry.startUp(abstractConfiguration.getEnvironmentMap()); this.metricManager = metricManager(abstractConfiguration.getMeterConfig()); this.aclManager = new JCasBinAclManager(abstractConfiguration.getAclConfig()); + this.authManager = authManagerFactory().provider(abstractConfiguration.getAuthConfig()).getAuthManager(); Optional.ofNullable(abstractConfiguration.getSourceDefinitions()).ifPresent(sourceDefinitions -> sourceDefinitions.forEach(SourceManager::loadSource)); this.timeAckManager = new TimeAckManager(20, TimeUnit.MILLISECONDS, 50); } @@ -118,6 +119,14 @@ public abstract class AbstractReceiveContext implements } } + public AuthManagerProvider authManagerFactory() { + return AuthManagerFactory::new; + } + + public interface AuthManagerProvider { + AuthManagerFactory provider(AuthConfig authConfig); + + } private EventRegistry eventRegistry() { return Event::sender; diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java index 3285db9b245146930c62361ca8304cf3f6d7d8ac..e4ee90bdb5bb628b75b643e20cd3d83b3b7415a5 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/mqtt/MqttConfiguration.java @@ -1,6 +1,5 @@ package io.github.quickmsg.core.mqtt; -import io.github.quickmsg.common.auth.PasswordAuthentication; import io.github.quickmsg.common.config.*; import io.github.quickmsg.common.rule.RuleChainDefinition; import io.github.quickmsg.common.rule.source.SourceDefinition; @@ -40,7 +39,7 @@ public class MqttConfiguration extends AbstractSslHandler implements AbstractCon private Integer workThreadSize = Runtime.getRuntime().availableProcessors() * 2; - private Integer businessThreadSize = Runtime.getRuntime().availableProcessors() * 4; + private Integer businessThreadSize = Runtime.getRuntime().availableProcessors() ; private Integer businessQueueSize = 100000; @@ -62,6 +61,8 @@ public class MqttConfiguration extends AbstractSslHandler implements AbstractCon private AclConfig aclConfig; + private AuthConfig authConfig; + private Map environmentMap; private Integer messageMaxSize = 4194304; diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/CommonProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/CommonProtocol.java index 237135ab76e4af7a6dd907c6e54321cd12fdf079..125aa237fdbcda8e5d6cce8a97cf4cd63102370c 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/CommonProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/CommonProtocol.java @@ -1,6 +1,6 @@ package io.github.quickmsg.core.protocol; -import io.github.quickmsg.common.ack.Ack; +import io.github.quickmsg.common.retry.Ack; import io.github.quickmsg.common.channel.MqttChannel; import io.github.quickmsg.common.context.ReceiveContext; import io.github.quickmsg.common.enums.ChannelStatus; diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java index a558156388f4de6e77c0ec2e53db9e1b5569c4f7..5c725ed2833132849991c7d41c3fe879e0b01b90 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/ConnectProtocol.java @@ -2,9 +2,10 @@ package io.github.quickmsg.core.protocol; import io.github.quickmsg.common.acl.AclAction; import io.github.quickmsg.common.acl.AclManager; -import io.github.quickmsg.common.auth.PasswordAuthentication; +import io.github.quickmsg.common.auth.AuthManager; import io.github.quickmsg.common.channel.ChannelRegistry; import io.github.quickmsg.common.channel.MqttChannel; +import io.github.quickmsg.common.config.AuthConfig; import io.github.quickmsg.common.config.ConnectModel; import io.github.quickmsg.common.context.ReceiveContext; import io.github.quickmsg.common.enums.ChannelStatus; @@ -66,7 +67,7 @@ public class ConnectProtocol implements Protocol { TopicRegistry topicRegistry = mqttReceiveContext.getTopicRegistry(); MetricManager metricManager = mqttReceiveContext.getMetricManager(); byte mqttVersion = (byte) mqttConnectVariableHeader.version(); - AclManager aclManager = mqttReceiveContext.getAclManager(); + AuthManager authManager = mqttReceiveContext.getAuthManager(); /*check clientIdentifier exist*/ MqttChannel existMqttChannel = channelRegistry.get(clientIdentifier); if (mqttReceiveContext.getConfiguration().getConnectModel() == ConnectModel.UNIQUE) { @@ -95,7 +96,7 @@ public class ConnectProtocol implements Protocol { false).then(mqttChannel.close()); } /*password check*/ - if (aclManager.auth(clientIdentifier, clientIdentifier, AclAction.CONNECT)) { + if (authManager.auth(mqttConnectPayload.userName(),mqttConnectPayload.passwordInBytes(), clientIdentifier)) { /*cancel defer close not authenticate channel */ mqttChannel.disposableClose(); mqttChannel.setClientIdentifier(mqttConnectPayload.clientIdentifier()); diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishAckProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishAckProtocol.java index 8d7e6b9d5245adc2caa6a7650150ea5677315186..c8ba575a4b3ac99a864514a4cd58b607835370b5 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishAckProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishAckProtocol.java @@ -1,11 +1,10 @@ package io.github.quickmsg.core.protocol; -import io.github.quickmsg.common.ack.Ack; +import io.github.quickmsg.common.retry.Ack; import io.github.quickmsg.common.channel.MqttChannel; import io.github.quickmsg.common.context.ReceiveContext; import io.github.quickmsg.common.message.SmqttMessage; import io.github.quickmsg.common.protocol.Protocol; -import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttPubAckMessage; diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java index bad4dbc92a26d59527e246eaecc9b76e1f39fe31..2d9112359bb6bb7c9d655e3940d04b7584ce2b01 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/PublishProtocol.java @@ -49,7 +49,7 @@ public class PublishProtocol implements Protocol { MetricManagerHolder.metricManager.getMetricRegistry().getMetricCounter(CounterType.PUBLISH_EVENT).increment(); MqttPublishMessage message = smqttMessage.getMessage(); AclManager aclManager = receiveContext.getAclManager(); - if (!mqttChannel.getIsMock() && !aclManager.auth(mqttChannel.getClientIdentifier(), message.variableHeader().topicName(), AclAction.PUBLISH)) { + if (!mqttChannel.getIsMock() && !aclManager.check(mqttChannel, message.variableHeader().topicName(), AclAction.PUBLISH)) { log.warn("mqtt【{}】publish topic 【{}】 acl not authorized ", mqttChannel.getConnection(), message.variableHeader().topicName()); return Mono.empty(); } diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java index 4142b276c863c523770b09a424fac83acb3d3516..8d5fae32351c144d0e58219c30724912557e6ca0 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/protocol/SubscribeProtocol.java @@ -15,6 +15,7 @@ import io.github.quickmsg.common.topic.TopicRegistry; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; +import org.apache.commons.collections.CollectionUtils; import reactor.core.publisher.Mono; import reactor.util.context.ContextView; @@ -45,9 +46,11 @@ public class SubscribeProtocol implements Protocol { .stream() .peek(mqttTopicSubscription -> this.loadRetainMessage(messageRegistry, mqttChannel, mqttTopicSubscription.topicName())) .map(mqttTopicSubscription -> new SubscribeTopic(mqttTopicSubscription.topicName(), mqttTopicSubscription.qualityOfService(), mqttChannel)) - .filter(subscribeTopic -> aclManager.auth(mqttChannel.getClientIdentifier(), subscribeTopic.getTopicFilter(), AclAction.SUBSCRIBE)) + .filter(subscribeTopic -> aclManager.check(mqttChannel, subscribeTopic.getTopicFilter(), AclAction.SUBSCRIBE)) .collect(Collectors.toSet()); - topicRegistry.registrySubscribesTopic(mqttTopicSubscriptions); + if(CollectionUtils.isNotEmpty(mqttTopicSubscriptions)){ + topicRegistry.registrySubscribesTopic(mqttTopicSubscriptions); + } }).then(mqttChannel.write( MqttMessageBuilder.buildSubAck( message.variableHeader().messageId(), diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java index ff606b417ddeb1f08e526424f8f6abef728a229e..b14fe7aa2c10a8246defff4024c600a6f7ffbb35 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/spi/DefaultProtocolAdaptor.java @@ -54,15 +54,11 @@ public class DefaultProtocolAdaptor implements ProtocolAdaptor { .doParseProtocol(smqttMessage, mqttChannel) .contextWrite(context -> context.putNonNull(ReceiveContext.class, receiveContext)) .subscribeOn(scheduler) - .onErrorContinue(((throwable, o) -> { - - })) .subscribe(aVoid -> { }, error -> { log.error("channel {} chooseProtocol: {} error {}", mqttChannel, mqttMessage, error.getMessage()); ReactorNetty.safeRelease(mqttMessage.payload()); }, () -> ReactorNetty.safeRelease(mqttMessage.payload()))); - } diff --git a/smqtt-core/src/main/java/io/github/quickmsg/core/ssl/AbstractSslHandler.java b/smqtt-core/src/main/java/io/github/quickmsg/core/ssl/AbstractSslHandler.java index d647de2acfdcf691822a5355c0998d74b0bc5203..c2678b2d6bd2fda2d0f3ff9a659033a41d59b57a 100644 --- a/smqtt-core/src/main/java/io/github/quickmsg/core/ssl/AbstractSslHandler.java +++ b/smqtt-core/src/main/java/io/github/quickmsg/core/ssl/AbstractSslHandler.java @@ -4,8 +4,10 @@ import io.github.quickmsg.common.config.Configuration; import io.github.quickmsg.common.config.SslContext; import io.github.quickmsg.core.mqtt.MqttConfiguration; import io.netty.channel.ChannelOption; +import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import reactor.netty.tcp.SslProvider; import reactor.netty.tcp.TcpServer; import reactor.netty.tcp.TcpSslContextSpec; @@ -22,23 +24,19 @@ public class AbstractSslHandler { public void secure(SslProvider.SslContextSpec sslContextSpec, Configuration configuration) { try { if (configuration.getSsl()) { - File cert; - File key; SslContext sslContext = configuration.getSslContext(); - if (sslContext != null && sslContext.getCrt() != null && sslContext.getKey() != null) { - cert = new File(sslContext.getCrt()); - key = new File(sslContext.getKey()); - + SslContextBuilder sslContextBuilder ; + if (sslContext != null) { + sslContextBuilder = SslContextBuilder.forServer(new File(sslContext.getCrt()), new File(sslContext.getKey())); + if(StringUtils.isNotEmpty(sslContext.getCa())){ + sslContextBuilder= sslContextBuilder.trustManager(new File(sslContext.getCa())); + } } else { SelfSignedCertificate ssc = new SelfSignedCertificate(); - cert = ssc.certificate(); - key = ssc.privateKey(); - log.info("SelfSignedCertificate cert {} key {}",cert.getAbsolutePath(),key.getAbsolutePath()); + sslContextBuilder = SslContextBuilder.forServer(ssc.certificate(),ssc.privateKey()); } - TcpSslContextSpec tcpSslContextSpec = TcpSslContextSpec.forServer(cert, key); - sslContextSpec.sslContext(tcpSslContextSpec); + sslContextSpec.sslContext(sslContextBuilder); } - } catch (Exception e) { log.error(" ssl read error", e); } diff --git a/smqtt-metric/pom.xml b/smqtt-metric/pom.xml index 1a20df7d87d1a3c079b2cc7eea2aad7a82966fa5..88db2918e91c670ce078b1c38e783ddc12a447db 100644 --- a/smqtt-metric/pom.xml +++ b/smqtt-metric/pom.xml @@ -8,7 +8,7 @@ smqtt io.github.quickmsg - 1.1.3 + 1.1.4 smqtt-metric diff --git a/smqtt-metric/smqtt-metric-influxdb/pom.xml b/smqtt-metric/smqtt-metric-influxdb/pom.xml index 7dcf10323163f063602b616c361db1ecbb9cb797..6fd7cdff473400d6b48cb2f3ae770014b2e6a169 100644 --- a/smqtt-metric/smqtt-metric-influxdb/pom.xml +++ b/smqtt-metric/smqtt-metric-influxdb/pom.xml @@ -5,7 +5,7 @@ smqtt-metric io.github.quickmsg - 1.1.3 + 1.1.4 4.0.0 @@ -15,7 +15,7 @@ io.github.quickmsg smqtt-common - 1.1.3 + 1.1.4 diff --git a/smqtt-metric/smqtt-metric-prometheus/pom.xml b/smqtt-metric/smqtt-metric-prometheus/pom.xml index fd384860945a26b52217566be865f5886905ed09..47c91c44fd8f130ee2fe9cc67c669f3211e5abb2 100644 --- a/smqtt-metric/smqtt-metric-prometheus/pom.xml +++ b/smqtt-metric/smqtt-metric-prometheus/pom.xml @@ -5,7 +5,7 @@ smqtt-metric io.github.quickmsg - 1.1.3 + 1.1.4 4.0.0 @@ -19,7 +19,7 @@ io.github.quickmsg smqtt-common - 1.1.3 + 1.1.4 diff --git a/smqtt-persistent/pom.xml b/smqtt-persistent/pom.xml index 76e563db248aa76aaa2d4f79f10353b4c52db84d..c3a41482b8edab62c1f1f67ff0e187377ba8ae09 100644 --- a/smqtt-persistent/pom.xml +++ b/smqtt-persistent/pom.xml @@ -5,7 +5,7 @@ smqtt io.github.quickmsg - 1.1.3 + 1.1.4 4.0.0 pom diff --git a/smqtt-persistent/smqtt-persistent-db/pom.xml b/smqtt-persistent/smqtt-persistent-db/pom.xml index f801dbf940990f1beec8739fda925792481a15e9..380551f2634f1d7d1fad0aab480499ee59ac6fcd 100644 --- a/smqtt-persistent/smqtt-persistent-db/pom.xml +++ b/smqtt-persistent/smqtt-persistent-db/pom.xml @@ -5,12 +5,12 @@ smqtt-persistent io.github.quickmsg - 1.1.3 + 1.1.4 4.0.0 smqtt-persistent-db - 1.1.3 + 1.1.4 3.14.11 @@ -20,7 +20,7 @@ io.github.quickmsg smqtt-common - 1.1.3 + 1.1.4 compile diff --git a/smqtt-persistent/smqtt-persistent-redis/pom.xml b/smqtt-persistent/smqtt-persistent-redis/pom.xml index f6f890c7a4fa96219f0629a30f7a5b01384aa49e..eff5c0c646834e825c3e374c6aea8ec24a8d0ee5 100644 --- a/smqtt-persistent/smqtt-persistent-redis/pom.xml +++ b/smqtt-persistent/smqtt-persistent-redis/pom.xml @@ -5,12 +5,12 @@ smqtt-persistent io.github.quickmsg - 1.1.3 + 1.1.4 4.0.0 smqtt-persistent-redis - 1.1.3 + 1.1.4 3.15.6 @@ -20,7 +20,7 @@ io.github.quickmsg smqtt-common - 1.1.3 + 1.1.4 compile diff --git a/smqtt-registry/pom.xml b/smqtt-registry/pom.xml index 9524f5a3687de4c779f2324d878e2bc6f2554d11..1f824b99b9bf8f7cc198fed3f51fa696197dd816 100644 --- a/smqtt-registry/pom.xml +++ b/smqtt-registry/pom.xml @@ -5,7 +5,7 @@ smqtt io.github.quickmsg - 1.1.3 + 1.1.4 4.0.0 pom diff --git a/smqtt-registry/smqtt-registry-scube/pom.xml b/smqtt-registry/smqtt-registry-scube/pom.xml index 86f410a940416380f42e955275732f6eb4d4c179..4436c116f5f855ea57b75592918c9ac320ab6434 100644 --- a/smqtt-registry/smqtt-registry-scube/pom.xml +++ b/smqtt-registry/smqtt-registry-scube/pom.xml @@ -5,7 +5,7 @@ smqtt-registry io.github.quickmsg - 1.1.3 + 1.1.4 4.0.0 smqtt-registry-scube @@ -50,7 +50,7 @@ io.github.quickmsg smqtt-common - 1.1.3 + 1.1.4 provided diff --git a/smqtt-rule/pom.xml b/smqtt-rule/pom.xml index c5765aac501174748ca036da96d6e223b567fc58..2a0e4511a4cdc8daa77c65206775a818b540b8c0 100644 --- a/smqtt-rule/pom.xml +++ b/smqtt-rule/pom.xml @@ -7,7 +7,7 @@ smqtt io.github.quickmsg - 1.1.3 + 1.1.4 smqtt-rule diff --git a/smqtt-rule/smqtt-rule-dsl/pom.xml b/smqtt-rule/smqtt-rule-dsl/pom.xml index bd20d2d8b1bfb7d71aa09003431547ab9bac1ec6..9f2740ad42aedd8e4458d7027c241f42e4f59d8b 100644 --- a/smqtt-rule/smqtt-rule-dsl/pom.xml +++ b/smqtt-rule/smqtt-rule-dsl/pom.xml @@ -5,7 +5,7 @@ smqtt-rule io.github.quickmsg - 1.1.3 + 1.1.4 4.0.0 @@ -15,13 +15,13 @@ io.github.quickmsg smqtt-common - 1.1.3 + 1.1.4 provided smqtt-rule-engine io.github.quickmsg - 1.1.3 + 1.1.4 diff --git a/smqtt-rule/smqtt-rule-engine/pom.xml b/smqtt-rule/smqtt-rule-engine/pom.xml index a23e9cfba683d59f766ba2dcdbc1b5be6b16aca0..46a18b11a19d601b7007c3f1226ed1bcf3d6aca3 100644 --- a/smqtt-rule/smqtt-rule-engine/pom.xml +++ b/smqtt-rule/smqtt-rule-engine/pom.xml @@ -7,7 +7,7 @@ smqtt-rule io.github.quickmsg - 1.1.3 + 1.1.4 smqtt-rule-engine @@ -18,44 +18,44 @@ io.github.quickmsg smqtt-common - 1.1.3 + 1.1.4 provided io.github.quickmsg smqtt-rule-source-kafka - 1.1.3 + 1.1.4 io.github.quickmsg smqtt-rule-source-http - 1.1.3 + 1.1.4 io.github.quickmsg smqtt-rule-source-rocketmq - 1.1.3 + 1.1.4 io.github.quickmsg smqtt-rule-source-rabbitmq - 1.1.3 + 1.1.4 io.github.quickmsg smqtt-rule-source-db - 1.1.3 + 1.1.4 io.github.quickmsg smqtt-rule-source-mqtt - 1.1.3 + 1.1.4 diff --git a/smqtt-rule/smqtt-rule-source/pom.xml b/smqtt-rule/smqtt-rule-source/pom.xml index 95886a06951c81387b0a4a0a7857ada12a44d882..86501650cd48d5d72ac8779b3d186e8426e7b67e 100644 --- a/smqtt-rule/smqtt-rule-source/pom.xml +++ b/smqtt-rule/smqtt-rule-source/pom.xml @@ -8,7 +8,7 @@ smqtt-rule io.github.quickmsg - 1.1.3 + 1.1.4 smqtt-rule-source @@ -33,7 +33,7 @@ io.github.quickmsg smqtt-common - 1.1.3 + 1.1.4 provided diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-db/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-db/pom.xml index a49e6fb1abc1db3791ce52e74c51f9a5c4011e10..71d190c878896d57194397c73df13a0432ff9eb9 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-db/pom.xml +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-db/pom.xml @@ -5,13 +5,13 @@ smqtt-rule-source io.github.quickmsg - 1.1.3 + 1.1.4 4.0.0 io.github.quickmsg smqtt-rule-source-db - 1.1.3 + 1.1.4 3.14.11 diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-http/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-http/pom.xml index 6442de0eeb7cc254aa24ee70af8d62f1860a1a2b..222de62bebbaf0a0736e95680397336564abb67e 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-http/pom.xml +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-http/pom.xml @@ -6,7 +6,7 @@ io.github.quickmsg smqtt-rule-source-http - 1.1.3 + 1.1.4 smqtt-rule-source-http @@ -15,14 +15,14 @@ smqtt-rule-source io.github.quickmsg - 1.1.3 + 1.1.4 io.github.quickmsg smqtt-common - 1.1.3 + 1.1.4 provided diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-kafka/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-kafka/pom.xml index a179c0f6088661960c5bcda6ae73c6c02bfd014b..4702acffb721fe870f654029ba116863fed0d3b0 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-kafka/pom.xml +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-kafka/pom.xml @@ -5,12 +5,12 @@ smqtt-rule-source io.github.quickmsg - 1.1.3 + 1.1.4 4.0.0 smqtt-rule-source-kafka - 1.1.3 + 1.1.4 https://github.com/quickmsg/smqtt diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/pom.xml index 803c5b8eaae20823b0f06ea631458ec97720aee9..515f5fd77667885d88a1c909e57aca3978ca9ab5 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/pom.xml +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-mqtt/pom.xml @@ -5,13 +5,13 @@ smqtt-rule-source io.github.quickmsg - 1.1.3 + 1.1.4 4.0.0 io.github.quickmsg smqtt-rule-source-mqtt - 1.1.3 + 1.1.4 diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rabbitmq/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rabbitmq/pom.xml index c72771292070a3e2eeabcfa09ef622bee9c0d784..597b631498ad2319b046e5ebd2e3f15776b39226 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rabbitmq/pom.xml +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rabbitmq/pom.xml @@ -5,13 +5,13 @@ smqtt-rule-source io.github.quickmsg - 1.1.3 + 1.1.4 4.0.0 io.github.quickmsg smqtt-rule-source-rabbitmq - 1.1.3 + 1.1.4 diff --git a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rocketmq/pom.xml b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rocketmq/pom.xml index 7392fdedac1c613c97b2a7716bc6dd1f67528c12..4e5418676f96ec314cf0bf5b7756de8456b25514 100644 --- a/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rocketmq/pom.xml +++ b/smqtt-rule/smqtt-rule-source/smqtt-rule-source-rocketmq/pom.xml @@ -5,13 +5,13 @@ smqtt-rule-source io.github.quickmsg - 1.1.3 + 1.1.4 4.0.0 io.github.quickmsg smqtt-rule-source-rocketmq - 1.1.3 + 1.1.4 diff --git a/smqtt-spring-boot-starter/pom.xml b/smqtt-spring-boot-starter/pom.xml index 834e59653e93f16daead35dcee0fd07f13887eec..6783cfc5c9c5dcc31a654f38af00b5b619703c60 100644 --- a/smqtt-spring-boot-starter/pom.xml +++ b/smqtt-spring-boot-starter/pom.xml @@ -7,7 +7,7 @@ smqtt io.github.quickmsg - 1.1.3 + 1.1.4 smqtt-spring-boot-starter @@ -34,17 +34,17 @@ io.github.quickmsg smqtt-core - 1.1.3 + 1.1.4 smqtt-registry-scube io.github.quickmsg - 1.1.3 + 1.1.4 smqtt-ui io.github.quickmsg - 1.1.3 + 1.1.4 io.projectreactor.netty diff --git a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java index 918c16615b1b20936ef6d71dbe833790077cb3ee..c80dfb169a56b28c4655a165f6589275befb039b 100644 --- a/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java +++ b/smqtt-spring-boot-starter/src/main/java/io/github/quickmsg/starter/AutoMqttConfiguration.java @@ -1,7 +1,6 @@ package io.github.quickmsg.starter; import ch.qos.logback.classic.Level; -import io.github.quickmsg.common.auth.PasswordAuthentication; import io.github.quickmsg.common.config.ConnectModel; import io.github.quickmsg.common.utils.IPUtils; import io.github.quickmsg.core.Bootstrap; diff --git a/smqtt-ui/pom.xml b/smqtt-ui/pom.xml index 933538c71c6e97bdbdd413a5183d02109babce33..3b67b5e4a3bd34add92d8a71967f8c34b631fd67 100644 --- a/smqtt-ui/pom.xml +++ b/smqtt-ui/pom.xml @@ -5,7 +5,7 @@ smqtt io.github.quickmsg - 1.1.3 + 1.1.4 4.0.0 smqtt-ui diff --git a/smqtt-ui/src/pages/dashboard/acl/Acl.vue b/smqtt-ui/src/pages/dashboard/acl/Acl.vue index b5d6ceceef3e3aaa3160b4c19d4171852089378b..a3f6f9d1663418698391320f2ef85d2293e9e334 100644 --- a/smqtt-ui/src/pages/dashboard/acl/Acl.vue +++ b/smqtt-ui/src/pages/dashboard/acl/Acl.vue @@ -4,21 +4,17 @@ layout="inline" class="antAdvancedSearchForm" > - - - + + - - + + - - + + ALL - - CONNECT - SUBSCRIBE @@ -26,29 +22,44 @@ PUBLISH - + + + + + ALL + + + DENY + + + ALLOW + + + 新增 - 查询 - 重置 - 删除 + 查询 + 重置 + 删除 + + - + - + - + - - ALL - - - CONNECT - SUBSCRIBE @@ -96,6 +102,25 @@ + + + + + DENY + + + ALLOW + + + +
@@ -122,16 +147,20 @@ const columns = [ customRender: (text, record, index) => index + 1 }, { - title: '设备', + title: '规则', dataIndex: "subject", }, { - title: '资源', + title: 'topic', dataIndex: "source", }, { title: '类型', dataIndex: "action", + }, + { + title: '策略', + dataIndex: "aclType", } ] export default { @@ -139,11 +168,12 @@ export default { data() { return { params: { - action: "CONNECT", + action: "ALL", current: 1, pageSize: 10, subject: null, - source: null + source: null, + aclType: "ALL" }, pagination: { @@ -153,7 +183,6 @@ export default { showTotal: total => `Total ${total} items`, // 显示总数 onShowSizeChange: (page, pageSize) => { this.pagination.pageSize = pageSize - console.log(page) } }, selectedRowKeys: [], @@ -161,7 +190,12 @@ export default { dataSource: null, visible: false, confirmLoading: false, - form: {} + form: { + action: "PUBLISH", + subject: null, + source: null, + aclType: "ALLOW" + } } }, mounted() { @@ -169,11 +203,12 @@ export default { }, methods: { reset() { - this.params.action = "CONNECT" + this.params.action = "ALL" this.params.current = 1 this.params.pageSize = 10 this.params.subject = null this.params.source = null + this.params.aclType = null this.queryActionData() }, @@ -210,8 +245,12 @@ export default { }) this.visible = false; this.confirmLoading = false - this.params.action = this.form.action - this.form = {} + this.form = { + action: "PUBLISH", + subject: null, + source: null, + aclType: "ALLOW" + } await this.queryActionData() }, diff --git a/smqtt-ui/src/router/config.js b/smqtt-ui/src/router/config.js index affdeb73631867225928254c89d458ddc9a980f6..a59bb640adfe678922bf9207c6c6b872481d457b 100644 --- a/smqtt-ui/src/router/config.js +++ b/smqtt-ui/src/router/config.js @@ -56,7 +56,7 @@ const options = { }, { path: 'acl', - name: '权限校验', + name: '访问控制', component: () => import('@/pages/dashboard/acl'), } diff --git a/smqtt-ui/vue.config.js b/smqtt-ui/vue.config.js index c08d4da070dc1256f22c4ad5a75034bb799f1f04..f4bfbe6be5894de09d99a57ad69ff3605dcf3601 100644 --- a/smqtt-ui/vue.config.js +++ b/smqtt-ui/vue.config.js @@ -23,14 +23,14 @@ const assetsCDN = { css: [ ], js: [ - '//cdn.jsdelivr.net/npm/vue@2.6.11/dist/vue.min.js', - '//cdn.jsdelivr.net/npm/vue-router@3.3.4/dist/vue-router.min.js', - '//cdn.jsdelivr.net/npm/vuex@3.4.0/dist/vuex.min.js', - '//cdn.jsdelivr.net/npm/axios@0.19.2/dist/axios.min.js', - '//cdn.jsdelivr.net/npm/nprogress@0.2.0/nprogress.min.js', - '//cdn.jsdelivr.net/npm/clipboard@2.0.6/dist/clipboard.min.js', - '//cdn.jsdelivr.net/npm/@antv/data-set@0.11.4/build/data-set.min.js', - '//cdn.jsdelivr.net/npm/js-cookie@2.2.1/src/js.cookie.min.js' + '//fastly.jsdelivr.net/npm/vue@2.6.11/dist/vue.min.js', + '//fastly.jsdelivr.net/npm/vue-router@3.3.4/dist/vue-router.min.js', + '//fastly.jsdelivr.net/npm/vuex@3.4.0/dist/vuex.min.js', + '//fastly.jsdelivr.net/npm/axios@0.19.2/dist/axios.min.js', + '//fastly.jsdelivr.net/npm/nprogress@0.2.0/nprogress.min.js', + '//fastly.jsdelivr.net/npm/clipboard@2.0.6/dist/clipboard.min.js', + '//fastly.jsdelivr.net/npm/@antv/data-set@0.11.4/build/data-set.min.js', + '//fastly.jsdelivr.net/npm/js-cookie@2.2.1/src/js.cookie.min.js' ] }