1 Star 0 Fork 0

rocket049/gobstream

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
autoreader.go 3.38 KB
一键复制 编辑 原始数据 按行查看 历史
package gobstream
import (
"bytes"
"io/fs"
"log"
"os"
"path/filepath"
)
// StandaloneData 经过处理后的完整数据的统一数据结构
type StandaloneData struct {
Typ string // 可能有三种值:file/byte
Session uint32 // 用数字代表不同的通道
FileName string // 文件完整路径,对应 file 类型
Bytes []byte // 二进制内容, 对应 byte 类型
Ext uint32 // 供程序自定义的扩展信息
}
// 对应 StandaloneData 的两种数据类型的常量
const (
DataTypeFile = "file"
DataTypeByte = "byte"
)
// BeginReader() 返回一个管道,
// 调用该方法后,程序开始在后台解析接收到的数据包,
// 把数据还原为 byte[] 数据或文件,
// 用户可以反复从返回的管道读取 StandaloneData 数据。
func (stream *StreamConn) BeginReader() (res chan *StandaloneData, err error) {
var files = make(map[uint32]*os.File)
var datas = make(map[uint32]*bytes.Buffer)
res = make(chan *StandaloneData, 1)
go func() {
for {
data, err := stream.Recv()
if err != nil {
close(res)
return
}
switch data.Typ {
case TypFileHead:
// recv file header with session
tmp := os.TempDir()
fp, err := os.Create(filepath.Join(tmp, data.Name))
if err != nil {
log.Println(err.Error())
} else {
fp.Chmod(fs.FileMode(data.Mode))
files[data.Session] = fp
}
continue
case TypFileData:
yes, end, data1, err := data.GetFileData()
if end {
// file with session is end
dataMerged := &StandaloneData{
Typ: DataTypeFile,
Session: data.Session,
FileName: files[data.Session].Name(),
Ext: data.Ext,
Bytes: nil,
}
err := files[data.Session].Close()
if err != nil {
log.Println("filedata 1:" + err.Error())
} else {
res <- dataMerged
}
delete(files, data.Session)
} else if yes {
// recv data of file with session
if err != nil {
log.Println("filedata 2:" + err.Error())
} else {
_, err = files[data.Session].Write(data1)
if err != nil {
log.Println("filedata 3:" + err.Error())
}
}
}
continue
case TypStream:
s, err := data.GetBytes()
if err != nil {
log.Println(err.Error())
} else if data.Status == StatusAlone {
// recv alone data with session
//err = stream.SendString(data.Session, "Reply:"+s, false)
dataMerged := &StandaloneData{
Typ: DataTypeByte,
Session: data.Session,
FileName: "",
Ext: data.Ext,
Bytes: s,
}
res <- dataMerged
//if err != nil {
//log.Println(err.Error())
//}
} else if data.Status == StatusStart {
// recv first block of multi part data with session
datas[data.Session] = bytes.NewBuffer(s)
} else if data.Status == StatusEnd {
// end multi part data with session
// err = stream.SendString(data.Session, "Reply:"+datas[data.Session].String(), false)
// if err != nil {
// log.Println(err.Error())
// }
dataMerged := &StandaloneData{
Typ: DataTypeByte,
Session: data.Session,
FileName: "",
Ext: data.Ext,
Bytes: datas[data.Session].Bytes(),
}
res <- dataMerged
delete(datas, data.Session)
} else if data.Status == StatusEmpty {
datas[data.Session].Write(s)
}
continue
}
}
}()
return
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/rocket049/gobstream.git
git@gitee.com:rocket049/gobstream.git
rocket049
gobstream
gobstream
master

搜索帮助