代码拉取完成,页面将自动刷新
package gobstream
import (
"bytes"
"io/fs"
"log"
"os"
"path/filepath"
)
// StandaloneData 经过处理后的完整数据的统一数据结构
type StandaloneData struct {
Typ string // 可能有三种值:file/byte
Session uint32 // 用数字代表不同的通道
FileName string // 文件完整路径,对应 file 类型
Bytes []byte // 二进制内容, 对应 byte 类型
Ext uint32 // 供程序自定义的扩展信息
}
// 对应 StandaloneData 的两种数据类型的常量
const (
DataTypeFile = "file"
DataTypeByte = "byte"
)
// BeginReader() 返回一个管道,
// 调用该方法后,程序开始在后台解析接收到的数据包,
// 把数据还原为 byte[] 数据或文件,
// 用户可以反复从返回的管道读取 StandaloneData 数据。
func (stream *StreamConn) BeginReader() (res chan *StandaloneData, err error) {
var files = make(map[uint32]*os.File)
var datas = make(map[uint32]*bytes.Buffer)
res = make(chan *StandaloneData, 1)
go func() {
for {
data, err := stream.Recv()
if err != nil {
close(res)
return
}
switch data.Typ {
case TypFileHead:
// recv file header with session
tmp := os.TempDir()
fp, err := os.Create(filepath.Join(tmp, data.Name))
if err != nil {
log.Println(err.Error())
} else {
fp.Chmod(fs.FileMode(data.Mode))
files[data.Session] = fp
}
continue
case TypFileData:
yes, end, data1, err := data.GetFileData()
if end {
// file with session is end
dataMerged := &StandaloneData{
Typ: DataTypeFile,
Session: data.Session,
FileName: files[data.Session].Name(),
Ext: data.Ext,
Bytes: nil,
}
err := files[data.Session].Close()
if err != nil {
log.Println("filedata 1:" + err.Error())
} else {
res <- dataMerged
}
delete(files, data.Session)
} else if yes {
// recv data of file with session
if err != nil {
log.Println("filedata 2:" + err.Error())
} else {
_, err = files[data.Session].Write(data1)
if err != nil {
log.Println("filedata 3:" + err.Error())
}
}
}
continue
case TypStream:
s, err := data.GetBytes()
if err != nil {
log.Println(err.Error())
} else if data.Status == StatusAlone {
// recv alone data with session
//err = stream.SendString(data.Session, "Reply:"+s, false)
dataMerged := &StandaloneData{
Typ: DataTypeByte,
Session: data.Session,
FileName: "",
Ext: data.Ext,
Bytes: s,
}
res <- dataMerged
//if err != nil {
//log.Println(err.Error())
//}
} else if data.Status == StatusStart {
// recv first block of multi part data with session
datas[data.Session] = bytes.NewBuffer(s)
} else if data.Status == StatusEnd {
// end multi part data with session
// err = stream.SendString(data.Session, "Reply:"+datas[data.Session].String(), false)
// if err != nil {
// log.Println(err.Error())
// }
dataMerged := &StandaloneData{
Typ: DataTypeByte,
Session: data.Session,
FileName: "",
Ext: data.Ext,
Bytes: datas[data.Session].Bytes(),
}
res <- dataMerged
delete(datas, data.Session)
} else if data.Status == StatusEmpty {
datas[data.Session].Write(s)
}
continue
}
}
}()
return
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。