代码拉取完成,页面将自动刷新
package main
import (
"encoding/json"
"fmt"
"net"
"os"
"strconv"
"strings"
"time"
"gitee.com/shipnet/flogo-common/netclient"
"github.com/avast/retry-go"
log "github.com/sirupsen/logrus"
)
type NetworkSettings struct {
Addr string `json:"addr,omitempty" mapstructure:"addr"`
Timeout string `json:"timeout,omitempty" mapstructure:"timeout"`
ReconnectInterval string `json:"reconnectInterval,omitempty" mapstructure:"reconnectInterval"`
}
type Config struct {
NetworkSettings NetworkSettings `json:"networkSettings,omitempty" mapstructure:"source"`
StaticEntries map[string]interface{} `json:"static_entries,omitempty" mapstructure:"staticEntries"`
}
func main() {
config := Config{}
err := Viper.Unmarshal(&config)
if err != nil {
log.WithError(err).Fatal("failed load config")
}
c := cache{}
for k, v := range config.StaticEntries {
err := c.setEntry(k, entry{
Value: v,
Static: true,
})
if err != nil {
log.WithError(err).Fatal("failed set static entries")
os.Exit(-1)
}
}
go sendLoop(&c)
settings := netclient.Settings{
Addr: config.NetworkSettings.Addr,
Timeout: config.NetworkSettings.Timeout,
ReconnectInterval: config.NetworkSettings.ReconnectInterval,
}
nc, err := netclient.New(settings)
if err != nil {
log.WithFields(log.Fields{"err": err.Error(), "settings": settings}).Fatal("failed create netclient")
os.Exit(-1)
}
r, err := nc.Receive()
if err != nil {
log.WithFields(log.Fields{"err": err.Error()}).Fatal("failed get netclient receiver")
os.Exit(-2)
}
ch, err := r.Events()
if err != nil {
log.WithFields(log.Fields{"err": err.Error()}).Fatal("failed get events channel")
os.Exit(-3)
}
framerConfig := FramerConfig{
STX: []byte("$"),
ETX: []byte("\n"),
MaxBytes: 200,
}
framer, err := NewFramer(framerConfig)
if err != nil {
log.WithFields(log.Fields{"err": err.Error()}).Fatal("failed create framer")
}
current := []byte{}
for e := range ch {
switch e.Type {
case netclient.EventData:
{
log.WithFields(log.Fields{"eventType": e.Type, "connInfo": e.ConnInfo, "payload": e.Data}).Info("got data")
current = append(current, e.Data...)
result := framer.Frame(current)
frames := result.Frames()
for _, f := range frames {
err := handleSentence(&c, string(f))
if err != nil {
log.WithFields(log.Fields{"err": err.Error(), "sentence": string(f)}).Error("failed process sentence")
}
}
current = result.Remains()
if len(current) > 100 {
current = current[100:]
}
}
default:
{
log.WithFields(log.Fields{"eventType": e.Type}).Info("netclient event")
}
}
}
}
func handleSentence(c *cache, sentence string) error {
sentence = strings.TrimSpace(sentence)
if len(sentence) == 0 {
return fmt.Errorf("empty sentence")
}
if sentence[0] != '$' {
return fmt.Errorf("malformed sentence, expect start with $: %s", sentence)
}
sentence = strings.TrimPrefix(sentence, "$")
parts := strings.Split(sentence, "*")
if len(parts) != 2 {
return fmt.Errorf("malformed sentence: %s", sentence)
}
chk := XorCheck([]byte(parts[0]))
sum, err := strconv.ParseUint(parts[1], 16, 64)
if err != nil {
return fmt.Errorf("malformed sentence, failed parse checksum: %s", sentence)
}
if uint64(chk) != sum {
return fmt.Errorf("corrupted sentence, expect %d, got %d", chk, sum)
}
content := strings.TrimSuffix(strings.TrimSpace(parts[0]), ",")
fields := strings.Split(content, ",")
if len(fields) == 0 {
return fmt.Errorf("malformed sentence, no fields found: %s", sentence)
}
name := strings.TrimSpace(fields[0])
var m map[string]entry
switch name {
case "SYRSA":
{
m, err = parseRSA(fields[1:])
if err != nil {
return fmt.Errorf("malformed rsa sentence: %s, err: %v", sentence, err)
}
break
}
case "APASD":
{
m, err = parseASD(fields[1:])
if err != nil {
return fmt.Errorf("malformed asd sentence: %s, err: %v", sentence, err)
}
}
case "APRSA":
{
m, err = parseRSA(fields[1:])
if err != nil {
return fmt.Errorf("malformed rsa sentence: %s, err: %v", sentence, err)
}
}
default:
{
return fmt.Errorf("unknown sentence: %s", sentence)
}
}
err = c.set(m)
return err
}
func parseRSA(fields []string) (map[string]entry, error) {
if len(fields) == 0 {
return nil, fmt.Errorf("malformed rsa sentence")
}
field := strings.TrimSpace(fields[0])
if !strings.Contains("LR", field[0:1]) {
return nil, fmt.Errorf("must start with L/R")
}
val, err := strconv.ParseFloat(field[1:], 64)
if err != nil {
return nil, err
}
m := map[string]entry{}
if field[0] == 'R' {
val = -val
}
m[RUDDER_SENSOR_ANGLE_KEY] = entry{
Value: val,
Timestamp: time.Now(),
}
return m, nil
}
func parseASD(fields []string) (map[string]entry, error) {
if len(fields) != 30 {
return nil, fmt.Errorf("expect %d fields, got %d", 30, len(fields))
}
m := map[string]entry{}
now := time.Now()
headingSetIndex := 0
headingSet, err := strconv.ParseFloat(fields[headingSetIndex], 64)
if err != nil {
return nil, fmt.Errorf("failed parse headingSet, err = %w", err)
}
m[RUDDER_HEADING_SET_KEY] = entry{
Value: headingSet,
Timestamp: now,
}
slewRateIndex := 1
slewRate, err := strconv.ParseFloat(fields[slewRateIndex], 64)
if err != nil {
return nil, fmt.Errorf("failed parse slewRate, err = %w", err)
}
m[RUDDER_SLEW_RATE_KEY] = entry{
Value: slewRate,
Timestamp: now,
}
commandAngleIndex := 2
commandAngle, err := parseAngle(fields[commandAngleIndex])
if err != nil {
return nil, fmt.Errorf("failed parse commandAngle, err = %w", err)
}
m[RUDDER_COMMAND_ANGLE_KEY] = entry{
Value: commandAngle,
Timestamp: now,
}
pumpOpModeIndex := 4
pumpOpMode := strings.TrimSpace(fields[pumpOpModeIndex])
m[RUDDER_PUMP_OP_MODE_KEY] = entry{
Value: pumpOpMode,
Timestamp: now,
}
commandModeIndex := 5
commandMode := strings.TrimSpace(fields[commandModeIndex])
m[RUDDER_COMMAND_MODE] = entry{
Value: commandMode,
Timestamp: now,
}
alarmStartIdx := 6
for idx, a := range alarms {
alarm, err := parseAlarm(fields[alarmStartIdx+idx])
if err != nil {
return nil, fmt.Errorf("failed parse %s, err = %w", a.key, err)
}
m[a.key] = entry{
Value: alarm,
Timestamp: now,
}
}
return m, nil
}
func parseAngle(s string) (float64, error) {
s = strings.TrimSpace(s)
if !strings.Contains("LR", s[0:1]) {
return 0, fmt.Errorf("invalid symbol, expect L/R, got %s", s)
}
v, err := strconv.ParseFloat(s[1:], 64)
if err != nil {
return 0, err
}
if s[0] == 'R' {
return -v, nil
}
return v, nil
}
func parseAlarm(s string) (int, error) {
s = strings.TrimSpace(s)
if s == "A" {
return 0, nil
} else {
return 1, nil
}
}
func sendLoop(c *cache) {
duration := time.Second * 60
ticker := time.NewTicker(time.Second * 60)
for t := range ticker.C {
log.WithFields(log.Fields{"tick": t}).Info("do ticker")
deadline := time.Now().Add(duration * -1)
send(c, deadline, time.Now())
}
}
func send(c *cache, deadline time.Time, now time.Time) {
msg, err := buildMessage(c, deadline, now)
if err != nil {
return
}
address := Viper.GetString("dest.addr")
if address == "" {
log.WithFields(log.Fields{"err": "no destination configured"}).Error("failed send payload")
return
}
conn, err := net.Dial("udp", address)
if err != nil {
return
}
defer conn.Close()
err = retry.Do(func() error {
n, err := conn.Write(msg)
if err != nil || n != len(msg) {
log.WithFields(log.Fields{"err": err.Error(), "written": n}).Error("failed send payload")
return err
}
log.WithFields(log.Fields{"msg": string(msg)}).Info("write payload")
return nil
}, retry.Attempts(3), retry.Delay(time.Second))
if err != nil {
log.WithFields(log.Fields{"err": err.Error()}).Error("failed send message")
}
}
func buildMessage(c *cache, deadline time.Time, now time.Time) ([]byte, error) {
m := map[string]interface{}{}
for k, v := range c.getAll() {
if v.Static || !v.Timestamp.Before(deadline) {
m[k] = v.Value
}
}
m["time"] = now
var ida interface{}
var ok bool
if ida, ok = m["id"]; !ok {
return nil, fmt.Errorf("no id configured")
}
id := fmt.Sprintf("%v", ida)
ma := map[string]interface{}{
"id": id,
"data": m,
}
bs, err := json.Marshal(ma)
return bs, err
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。