2 Star 0 Fork 0

Admin/bd_sender

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
main.go 6.48 KB
一键复制 编辑 原始数据 按行查看 历史
钟林峰 提交于 2020-03-12 15:07 +08:00 . save
package main
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"os"
"strings"
"time"
log "github.com/sirupsen/logrus"
)
// SenderConfig sender config
type SenderConfig struct {
slots []Slot
parsers []Parser
}
// Sender sender
type Sender struct {
Config SenderConfig
data string
// context *context.Context
stopCh chan int
dataCh chan rdata
encodeCh chan int
outputCh chan []byte
}
// // SenderState sender state
// type SenderState struct {
// Data map[string]interface{}
// }
type rdata interface {
GetProperty(prop string) (interface{}, bool)
Data() []byte
}
// New create new sender
func New(config SenderConfig) (*Sender, error) {
var sender = &Sender{
Config: config,
dataCh: make(chan rdata),
stopCh: make(chan int),
encodeCh: make(chan int),
}
return sender, nil
}
func (sender *Sender) setOutput(ch chan []byte) {
sender.outputCh = ch
}
func (sender *Sender) encodeSlot(slot int, data string) ([]byte, error) {
return sender.Config.slots[slot].Encode(data)
}
// func (sender *Sender) nextSlot() {
// sender.currentSlot = (sender.currentSlot + 1) % len(sender.Config.slots)
// }
func merge(data string, part string) (string, error) {
if data == "" {
data = "{}"
}
org := map[string]interface{}{}
err := json.Unmarshal([]byte(data), &org)
if err != nil {
return data, err
}
if part == "" {
part = "{}"
}
var vpart map[string]interface{}
err = json.Unmarshal([]byte(part), &vpart)
if err != nil {
return data, err
}
for k, v := range vpart {
org[k] = v
}
s, err := json.Marshal(org)
return string(s), err
}
func (sender *Sender) run() {
for {
select {
case data := <-sender.dataCh:
{
for _, parser := range sender.Config.parsers {
res, err := parser.parse(data)
if err == nil {
// update cached data
s, err := merge(sender.data, res)
if err != nil {
log.WithFields(log.Fields{"err": err}).Error("merge data error")
}
sender.data = s
}
}
break
}
case ch := <-sender.encodeCh:
{
bs, err := sender.encodeSlot(ch, sender.data)
if err != nil {
log.WithFields(log.Fields{"err": err, "ch": ch}).Info("encode slot failed")
}
if len(bs) > 0 {
sender.outputCh <- bs
}
break
}
case <-sender.stopCh:
{
break
}
}
}
}
// Start start the sender
func (sender *Sender) Start() error {
go sender.run()
return nil
}
// Stop stop the sender
func (sender *Sender) Stop() error {
return nil
}
func newSlots(configs []SlotConfig) []Slot {
slots := make([]Slot, len(configs))
for index, config := range configs {
items := make([]item, len(config.Items))
for itemIdx, it := range config.Items {
it := item{
Type: it.Type,
Kind: it.Kind,
Path: it.Path,
Scale: it.Scale,
Value: it.Value,
Min: it.Min,
Max: it.Max,
Width: it.Width,
}
items[itemIdx] = it
}
// slot := Slot{
// head: config.Head.Data,
// items: items,
// }
slot, err := NewSlot(config.Head.Data, items)
if err != nil {
panic(err)
}
slots[index] = *slot
}
return slots
}
func newParsers(configs []ParserConfig) []Parser {
// aParser, err := NewAnalogParser("./doc/analog.csv")
// if err != nil {
// panic(err)
// }
// var parsers = []Parser{
// aParser,
// NewJSONParser(),
// }
parsers := make([]Parser, len(configs))
for index, config := range configs {
switch config.Type {
case "BYTES": // nolint
{
parser, err := NewBytesParser(config)
if err != nil {
panic(err)
}
parsers[index] = parser
}
case "JSON": // nolint
{
parser, err := NewJSONParser(config)
if err != nil {
panic(err)
}
parsers[index] = parser
}
default:
{
panic(fmt.Errorf("unknown parser Config: %+v", config))
}
}
}
return parsers
}
func newBeidouSender(sender *Sender, config BeidouSenderConfig) {
bSender, err := NewBeidouSender(config)
if err != nil {
panic(err)
}
go bSender.Start()
outputCh := make(chan []byte)
notifyCh := make(chan BeidouResult)
sender.setOutput(outputCh)
go func() {
for d := range outputCh {
serr := bSender.Send(d, notifyCh)
// fmt.Printf("%v\n", d)
// serr := errors.New("unimplementd")
if serr != nil {
log.WithFields(log.Fields{"err": serr}).Error("error output data")
}
}
}()
go func() {
for r := range notifyCh {
log.WithFields(log.Fields{"result": r}).Info("sending result")
}
}()
}
func loadConfig(path string) Config {
configFile, err := os.Open(path)
if err != nil {
panic(err)
}
defer configFile.Close() // nolint
content, err := ioutil.ReadAll(configFile)
if err != nil {
panic(err)
}
var config Config
err = json.Unmarshal(content, &config)
if err != nil {
panic(err)
}
return config
}
func initSources(configs []SourceConfig, ch chan rdata) {
for _, config := range configs {
switch config.Type {
case "UDP":
{
var params UDPConfig
err := json.Unmarshal(config.Params, &params)
if err != nil {
panic(err)
}
r, err := NewUDPReceiver(params, ch)
if err != nil {
panic(err)
}
err = r.Open()
if err != nil {
panic(err)
}
err = r.Start()
if err != nil {
panic(err)
}
break
}
default:
{
panic(fmt.Errorf(`unsupported source type: %s`, config.Type))
}
}
}
}
func main() {
rawPath := flag.String("config", "", "")
flag.Parse()
configPath := strings.Trim(*rawPath, "")
if len(configPath) == 0 {
fmt.Printf("%s --config <config file>", os.Args[0])
os.Exit(-1)
}
cfg := loadConfig(configPath)
log.WithFields(log.Fields{"config": cfg}).Info("loading config")
slots := newSlots(cfg.Slots)
parsers := newParsers(cfg.Parsers)
var config = SenderConfig{
slots: slots,
parsers: parsers,
}
sender, err := New(config)
if err != nil {
panic(err)
}
err = sender.Start()
if err != nil {
panic(err)
}
initSources(cfg.Sources, sender.dataCh)
go newBeidouSender(sender, cfg.BeidouSenderConfig)
slotIndex := 0
for {
log.WithFields(log.Fields{"index": slotIndex}).Info("trigger sender slot")
sender.encodeCh <- slotIndex % len(sender.Config.slots)
slotIndex = slotIndex + 1
time.Sleep(time.Duration(cfg.BeidouSenderConfig.Interval) * time.Second)
}
}
// func init() {
// // Log as JSON instead of the default ASCII formatter.
// log.SetFormatter(&log.JSONFormatter{})
// // Output to stdout instead of the default stderr
// // Can be any io.Writer, see below for File example
// log.SetOutput(os.Stdout)
// // Only log the warning severity or above.
// log.SetLevel(log.InfoLevel)
// }
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/shipnet_masterclock/bd_sender.git
git@gitee.com:shipnet_masterclock/bd_sender.git
shipnet_masterclock
bd_sender
bd_sender
master

搜索帮助