代码拉取完成,页面将自动刷新
package main
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"github.com/xuri/excelize/v2"
"log"
"strings"
"time"
)
// 读取数据
func readExcel(filename string) (interface{}, []string, error) {
f, err := excelize.OpenFile("books.xlsx")
if err != nil {
return nil, nil, err
}
defer func() {
if err := f.Close(); err != nil {
}
}()
// 获取 Sheet1 上所有单元格
rows, err := f.GetRows("Sheet1")
if err != nil {
return nil, nil, err
}
//获取分组
allMap := make([]map[string]string, 0)
groupMap := make(map[string]string, 0)
groupSlice := make([]string, 0)
for k, row := range rows {
var tmp = make(map[string]string, 0)
if k > 0 {
for kk, colCell := range row {
//fmt.Print(colCell, "\t")
if kk == 0 {
if _, ok := groupMap[colCell]; !ok {
groupSlice = append(groupSlice, colCell)
groupMap[colCell] = colCell
}
tmp["catName"] = colCell
}
if kk == 1 ||colCell==""{
continue
tmp["name"] = colCell
}
if kk == 2 {
tmp["content"] = colCell
}
allMap = append(allMap, tmp)
}
}
}
return allMap, groupSlice, nil
}
func main() {
/*rs, groupData, _ := readExcel("")
fmt.Println(rs)
fmt.Println(groupData)*/
//fmt.Println(conn())
boolRs,msg,conn:=conn()
if !boolRs {
fmt.Println(msg)
return
}
// 生产
//produce(conn)
//time.Sleep(time.Duration(1)*time.Second)
// 消费
consumer(conn)
time.Sleep(time.Duration(5)*time.Second)
}
// 生产者
func produce(s *kafka.Conn) {
_, err := s.WriteMessages(
kafka.Message{
Key: []byte("555"),
Value: []byte("four!")},
kafka.Message{Key: []byte("666"),Value: []byte("two!")},
kafka.Message{Key: []byte("5"),Value: []byte("three!")},
)
//fmt.Println("谢金数据")
if err != nil {
log.Fatal("failed to write messages:", err.Error())
}
if err := s.Close(); err != nil {
log.Fatal("failed to close writer:", err.Error())
}
log.Println("successful to write messages:")
}
// 消费
func consumer(s *kafka.Conn) {
config := kafka.ReaderConfig{
Topic: "ny-yzt",
MinBytes: 10e3,
MaxBytes: 10e6,
CommitInterval: 1*time.Second,
Brokers: []string{"192.168.14.81:9092"},
Partition: 0,
GroupID: "consumer-group-id",
}
config.StartOffset =kafka.FirstOffset
//config.se
for {
consumer := kafka.NewReader(config)
m, err := s.ReadMessage(1e6)
//m,err:=consumer.FetchMessage(context.Background())
fmt.Printf(string([]byte(m.Value)))
if err != nil {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
if err:=consumer.CommitMessages(context.Background(),m);err!=nil {
fmt.Println("rs结果:",err.Error())
}
}
//fmt.Printf("66666")
if err := s.Close(); err != nil {
log.Fatal("failed to close connection:", err.Error())
}
}
//连接
func conn() (bool,string,*kafka.Conn) {
topic:="ny-yzt"
partition:=0
conn, err := kafka.DialLeader(context.Background(), "tcp", "192.168.14.81:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
err=conn.SetWriteDeadline(time.Now().Add(1*time.Second))
if err!=nil {
fmt.Println("===8888===",err)
return false,err.Error(),&kafka.Conn{}
}
//fmt.Println("===999===",conn.Broker())
return true,"",conn
/*_, err = conn.WriteMessages(
kafka.Message{Value: []byte("one!")},
kafka.Message{Value: []byte("two!")},
kafka.Message{Value: []byte("three!")},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
if err := conn.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}*/
}
// buffer 数组 offer 开始位置 len 数据长度
//返回字符串
func ggg(bufferArr []string, offset int, lenth int) string {
// 获取数组长度
l:= len(bufferArr)
if lenth>=l {
lenth=l
}
//开始循环拼接
rs:= bufferArr[offset:l]
/*for i:=0;i<l;i++{
bufferArr
}*/
return strings.Join(rs,",")
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。