# protocols **Repository Path**: chenyongfeng/protocols ## Basic Information - **Project Name**: protocols - **Description**: 基于netty的通信框架,支持标准IEC104协议 - **Primary Language**: Java - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 15 - **Created**: 2024-05-29 - **Last Updated**: 2024-05-29 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # protocols #### 介绍 本项目基于Netty实现,旨在对协议部分的通信过程进行封装,以实现与具体业务场景的解耦,提升开发效率。代码沿用和拓展了Netty的Promise异步编程模式,可满足基本的通信需求,若想详细了解可以参阅代码中类和方法上的注释,下面对各类规约进行简单的使用说明: 1. 标准Iec104规约 ``` /** * IEC104主站功能说明: * 1、基于netty nio多路复用以及扩展的Promise模式异步编程; * 2、较为完整的启动流程:连接-》启动传输-》定时总召唤、电度量召唤; * 3、较为完善的异常处理机制:断线自动重连;T1T2T3超时处理;kw流量控制;等等 * 4、使用简单:仅需一套简单的配置,即可实现各类测点数据的采集以及遥控、遥调命令下发。 */ @Slf4j public class TestClient { private static JFrame createJFrame() { JFrame jFrame = new JFrame(); Dimension screenSize = Toolkit.getDefaultToolkit().getScreenSize(); jFrame.setSize(screenSize); return jFrame; } public static void main(String[] args) throws Throwable { // 1、获取主站管理器 Iec104MasterManager masterManager = Iec104MasterManager.get(); // MasterStateManager中实现了一个简单的监视面板,可以用来显示当前的连接情况 JFrame jFrame = createJFrame(); jFrame.add(masterManager.getMonitorPanel()); jFrame.setVisible(true); // 2、新建连接,返回的future类似netty的ChannelFuture,只不过这个是关联的主站Master MasterFuture masterConnFuture = masterManager.connect( MasterConfig.configurer() .remote("127.0.0.1", 2404) // 连接的子站ip .local(1) // 公共地址 .controlConfig(MasterControlConfig.configurer() // 控制配置 .maxReconnectCount(-1) // 最大重连次数,-1代表无限重连 .autoStartDtV(true) // 是否自动启动传输激活 .t0t1t2t3(6, 3, 2, 4) // t0, t1, t2 ,t3 超时时间,单位为秒 .kw(12, 8) // 发送窗口和接收窗口的大小,要求k大于w .apduToStringFormat(NBytePduClip.ToStringFormat.DEFAULT_FORMAT) // APDU的打印格式,支持详细信息、十六进制、二进制 .configOk() ) .dataConfig(MasterDataConfig.configurer() // 数据配置 .totalCall100PeriodSeconds(60) // 总召唤周期,单位为秒 .totalCall101PeriodSeconds(60) // 电度量召唤周期,单位为秒 .configOk() ) .configOk() ); // 等待连接结果 // 这里既可以通过添加监听器实现异步,也可以调用sync()方法同步等待连接结果 masterConnFuture.addListener(new MasterFutureListener() { @Override public void operationComplete(MasterFuture future) { if (future.isSuccess()) { log.info("连接成功!"); } else { log.info("连接失败!"); } } }).sync(); // 如果连接成功,该master将加入至masterManager。可以从主站管理器或者MasterFuture中获取指定主站 Master master1 = Iec104MasterManager.get().getMaster("127.0.0.1", 2404); // 如果连接失败,master1为null if (master1 == null) return; Master master2 = masterConnFuture.master(); log.info("master1={}, master2={}, master1{}master2", master1, master2, master1 == master2); // 动态添加或移除回调处理函数 PduRecvCallback callback = frame -> { log.info("接收到帧:{}", frame); return false; // 每次接收到apdu都会执行 }; master2.getIAsduRecver().addRecvCallback(callback); // master2.getIAsduRecver().removeRecvCallback(callback); // 主站Asdu发送器提供了各种发送数据的API MasterIAsduSender iAsduSender = master2.getIAsduSender(); iAsduSender.sendTotalCall100(); iAsduSender.sendTotalCall101(); // 查看master状态 MControlInfo controlInfo = master2.controlInfo(); new Thread(() -> { int count = 0; while(++count < 100) { try { Thread.sleep(1000 * 2); log.info("isConnected: " + master2.isConnected()); log.info("isDoingTotalCall100: " + controlInfo.isDoingTotalCall100()); log.info("isStartedDt: " + controlInfo.isStartedDt()); log.info("isInitCompleted: " + controlInfo.isInitCompleted()); log.info(""); log.info("isConnected: " + master1.isConnected()); log.info("isDoingTotalCall100: " + controlInfo.isDoingTotalCall100()); log.info("isStartedDt: " + controlInfo.isStartedDt()); log.info("isInitCompleted: " + controlInfo.isInitCompleted()); log.info(""); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread.sleep(1000 * 1); // 执行遥控命令,遥调同理 if (master2.controlInfo().isInitCompleted()) { master2.getIAsduSender().sendOnePointTcSelect(24578, true).addListener(new MasterFutureListener() { @Override public void operationComplete(MasterFuture future) { if (future.isSuccess()) { log.info("遥控选择成功!"); future.master().getIAsduSender().sendOnePointTcExecute(24578, true).addListener(new MasterFutureListener() { @Override public void operationComplete(MasterFuture future) { if (future.isSuccess()) { log.info("遥控执行成功!"); } else { log.warn("遥控执行失败!"); } } }); } else { log.warn("遥控选择失败!"); } } }); } // 尝试运行1分钟 Thread.sleep(1000 * 60); log.info("<<<<<<<<<<< 即将断开..."); masterManager.disconnect(master1).sync(); // 关闭所有连接(如果有的话) masterManager.disconnectAll().whenComplete(new BiConsumer() { @Override public void accept(Boolean isSuccess, Throwable throwable) { if (isSuccess) { log.info("断开成功!"); } else { log.info("断开失败!"); } } }); // 关闭主站管理器,即使不关闭,程序退出也会自动关闭 masterManager.close(); } } ``` ``` /** * IEC104子站功能说明: * 1、基于Netty NIO多路复用;异步编程模式; * 2、较为完整的启动流程:启动传输响应-》初始化结束-》... * 3、支持主站IP过滤,在白名单之外的主站连接将被拒绝; * 4、支持遥测短浮点数;单点/双点遥信的总召唤、变化数据上送(需要根据具体的业务数据进行调用); * 5、支持遥控和遥调过程(需要根据具体的场景进行设置) * 6、支持总召唤、电度量召唤(根据具体业务场景进行调用) */ public class TestServer { public static void main(String[] args) throws Throwable { Iec104SlaveManager slaveManager = Iec104SlaveManager.get(); // 说明:基于Netty的主从Reactor模式,Netty线程仅用于处理IO,不处理实际业务逻辑,以提升IO效率 // 可通过创建自定义执行器来处理业务逻辑,如查询收集设备测值数据等 ExecutorService executorService = Executors.newFixedThreadPool(4); SlaveFuture slaveFuture = slaveManager.openSlave( SlaveConfig.configurer() .local("172.18.83.112", 2404) // 本机IP和端口号设置 .rtuAddr(1) // 公共地址 .controlConfig(SlaveControlConfig.configurer() // 控制配置 .t0t1t2t3(6, 3, 2, 4) // t0, t1, t2 ,t3 超时时间,单位为秒 .kw(12, 8) // 要求 k > w .ipFilterRule(new IpWhiteListFilterRule("127.0.0.1", "172.18.83.112")) // 主站IP白名单 .configOk() ) .dataConfig(SlaveDataConfig.configurer() // 数据配置 // 总召唤处理器,这里是模拟实现,可根据具体场景自行实现该DataReactor .totalCall100DataReactor(new TotalCall100DataReactor(executorService, TotalCall100DataReactor.TmDataType.FLOAT, TotalCall100DataReactor.TsDataType.ONE_POINT) { @Override protected boolean accept0(SlaveChannel slaveChannel) { return true; } @Override protected List collectTmData0(SlaveChannel slaveChannel) { List tmDataList = new ArrayList<>(100); Random random = new Random(); for (int i = 0; i < 100; i++) { if (random.nextBoolean()) { int addr = i; tmDataList.add(new TmData() { @Override public BigDecimal getValue() { return BigDecimal.valueOf(0); } @Override public boolean isValid() { return true; } @Override public int getAddress() { return addr; } }); } } return tmDataList; } @Override protected List collectTsData0(SlaveChannel slaveChannel) { List tsDataList = new ArrayList<>(100); Random random = new Random(); for (int i = 0; i < 100; i++) { if (random.nextBoolean()) { int addr = i; tsDataList.add(new TsData() { @Override public boolean isSwitchOn() { return random.nextBoolean(); } @Override public boolean isValid() { return true; } @Override public boolean isCurrVal() { return true; } @Override public boolean isReplaced() { return false; } @Override public boolean isLocked() { return false; } @Override public int getAddress() { return addr; } }); } } return tsDataList; } }) // 电度量召唤处理器,这里是模拟实现,可根据具体场景自行实现该DataReactor .totalCall101DataReactor(new TotalCall101DataReactor(executorService, TotalCall101DataReactor.TpDataType.CUMULANT) { @Override protected boolean accept0(SlaveChannel slaveChannel) { return true; } @Override protected List collectTpData0(SlaveChannel slaveChannel) { List tpDataList = new ArrayList<>(100); Random random = new Random(); for (int i = 0; i < 100; i++) { if (random.nextBoolean()) { int addr = i; tpDataList.add(new TpData() { @Override public long getValue() { return 0; } @Override public boolean isValid() { return true; } @Override public boolean isAdjusted() { return false; } @Override public boolean isOverflowed() { return false; } @Override public int getSeq() { return 0; } @Override public int getAddress() { return addr; } }); } } return tpDataList; } }) // 遥控命令处理器,这里是模拟实现,可根据具体场景自行实现该DataReactor .tcDataReactor(new TcDataReactor(executorService, Tc.Type.ONE_POINT) { @Override public CompletableFuture acceptSelect(SlaveChannel slaveChannel, int infoObjAddr) { return CompletableFuture.completedFuture(true); } @Override public CompletableFuture acceptExecute(SlaveChannel slaveChannel, int infoObjAddr) { return CompletableFuture.completedFuture(true); } }) // 遥调命令处理器,这里是模拟实现,可根据具体场景自行实现该DataReactor .taDataReactor(new TaDataReactor(executorService, Ta.Type.FLOAT) { @Override public CompletableFuture acceptSelect(SlaveChannel slaveChannel, int infoObjAddr) { return supplyAsync(() -> true); } @Override public CompletableFuture acceptExecute(SlaveChannel slaveChannel, int infoObjAddr) { return supplyAsync(() -> true); } }) .tmVaryType(Tm.VaryType.FLOAT) // 遥测变化类型 .configOk() ) .configOk() ); // 异步处理打开结果 slaveFuture.addListener(new SlaveFutureListener() { @Override public void operationComplete(SlaveFuture future) { if (future.isSuccess()) { System.out.println("端口打开成功"); } else { System.out.println("端口打开失败,原因:" + future.cause()); } } }); // -------------- 等待其运行一段时间 --------------------------- Thread.sleep(60 * 60 * 1000); // 关闭该子站 slaveManager.closeSlave(slaveFuture.slave()); executorService.shutdown(); } } ``` 2. ModbusTcp规约 后续待完善。