1 Star 0 Fork 0

1111听听风儿的声音/learn_go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
main.go 3.89 KB
一键复制 编辑 原始数据 按行查看 历史
1111听听风儿的声音 提交于 2022-08-29 18:02 +08:00 . init
package main
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"github.com/xuri/excelize/v2"
"log"
"strings"
"time"
)
// 读取数据
func readExcel(filename string) (interface{}, []string, error) {
f, err := excelize.OpenFile("books.xlsx")
if err != nil {
return nil, nil, err
}
defer func() {
if err := f.Close(); err != nil {
}
}()
// 获取 Sheet1 上所有单元格
rows, err := f.GetRows("Sheet1")
if err != nil {
return nil, nil, err
}
//获取分组
allMap := make([]map[string]string, 0)
groupMap := make(map[string]string, 0)
groupSlice := make([]string, 0)
for k, row := range rows {
var tmp = make(map[string]string, 0)
if k > 0 {
for kk, colCell := range row {
//fmt.Print(colCell, "\t")
if kk == 0 {
if _, ok := groupMap[colCell]; !ok {
groupSlice = append(groupSlice, colCell)
groupMap[colCell] = colCell
}
tmp["catName"] = colCell
}
if kk == 1 ||colCell==""{
continue
tmp["name"] = colCell
}
if kk == 2 {
tmp["content"] = colCell
}
allMap = append(allMap, tmp)
}
}
}
return allMap, groupSlice, nil
}
func main() {
/*rs, groupData, _ := readExcel("")
fmt.Println(rs)
fmt.Println(groupData)*/
//fmt.Println(conn())
boolRs,msg,conn:=conn()
if !boolRs {
fmt.Println(msg)
return
}
// 生产
//produce(conn)
//time.Sleep(time.Duration(1)*time.Second)
// 消费
consumer(conn)
time.Sleep(time.Duration(5)*time.Second)
}
// 生产者
func produce(s *kafka.Conn) {
_, err := s.WriteMessages(
kafka.Message{
Key: []byte("555"),
Value: []byte("four!")},
kafka.Message{Key: []byte("666"),Value: []byte("two!")},
kafka.Message{Key: []byte("5"),Value: []byte("three!")},
)
//fmt.Println("谢金数据")
if err != nil {
log.Fatal("failed to write messages:", err.Error())
}
if err := s.Close(); err != nil {
log.Fatal("failed to close writer:", err.Error())
}
log.Println("successful to write messages:")
}
// 消费
func consumer(s *kafka.Conn) {
config := kafka.ReaderConfig{
Topic: "ny-yzt",
MinBytes: 10e3,
MaxBytes: 10e6,
CommitInterval: 1*time.Second,
Brokers: []string{"192.168.14.81:9092"},
Partition: 0,
GroupID: "consumer-group-id",
}
config.StartOffset =kafka.FirstOffset
//config.se
for {
consumer := kafka.NewReader(config)
m, err := s.ReadMessage(1e6)
//m,err:=consumer.FetchMessage(context.Background())
fmt.Printf(string([]byte(m.Value)))
if err != nil {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
if err:=consumer.CommitMessages(context.Background(),m);err!=nil {
fmt.Println("rs结果:",err.Error())
}
}
//fmt.Printf("66666")
if err := s.Close(); err != nil {
log.Fatal("failed to close connection:", err.Error())
}
}
//连接
func conn() (bool,string,*kafka.Conn) {
topic:="ny-yzt"
partition:=0
conn, err := kafka.DialLeader(context.Background(), "tcp", "192.168.14.81:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
err=conn.SetWriteDeadline(time.Now().Add(1*time.Second))
if err!=nil {
fmt.Println("===8888===",err)
return false,err.Error(),&kafka.Conn{}
}
//fmt.Println("===999===",conn.Broker())
return true,"",conn
/*_, err = conn.WriteMessages(
kafka.Message{Value: []byte("one!")},
kafka.Message{Value: []byte("two!")},
kafka.Message{Value: []byte("three!")},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
if err := conn.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}*/
}
// buffer 数组 offer 开始位置 len 数据长度
//返回字符串
func ggg(bufferArr []string, offset int, lenth int) string {
// 获取数组长度
l:= len(bufferArr)
if lenth>=l {
lenth=l
}
//开始循环拼接
rs:= bufferArr[offset:l]
/*for i:=0;i<l;i++{
bufferArr
}*/
return strings.Join(rs,",")
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/nbvc/learn_go.git
git@gitee.com:nbvc/learn_go.git
nbvc
learn_go
learn_go
master

搜索帮助