代码拉取完成,页面将自动刷新
// @Title gob流式数据传输和文件传输
// @Description 利用gob编码,在一个TCP连接中同时传输多个通道的数据或文件。
// @Author 傅惠忠<fuhuizn@163.com> 2024-3-10
// @Update 傅惠忠<fuhuizn@163.com> 2024-3-10
package gobstream
import (
"bytes"
"compress/gzip"
"encoding/gob"
"errors"
"io"
"os"
"path/filepath"
)
// StreamData 用于网络传输的统一数据结构
type StreamData struct {
Typ string //可能有三种值:file/filedata/steamdata
Session uint32 //用数字代表不同的通道
Status string //控制当前通道的状态
Name string //文件名,仅用于传输文件
Mode uint32 //文件属性,等同于fs.FileMode,仅用于传输文件
Format string //代表该数据是否压缩,仅可能有两种值:raw/gz
Data []byte //数据本身
Ext uint32 //供程序自定义的扩展信息
}
const (
// status
StatusAlone = "alone"
StatusStart = "start"
StatusEnd = "end"
StatusEmpty = ""
// type
TypStream = "streamdata"
TypFileHead = "file"
TypFileData = "filedata"
)
// StreamConn 代表一个连接的结构体
type StreamConn struct {
Conn io.ReadWriteCloser
Enc *gob.Encoder
Dec *gob.Decoder
}
// NewStreamConn 从一个TCP连接创建新的StreamConn对象
func NewStreamConn(conn io.ReadWriteCloser) *StreamConn {
return &StreamConn{Conn: conn, Enc: gob.NewEncoder(conn), Dec: gob.NewDecoder(conn)}
}
// Close 关闭连接
func (p *StreamConn) Close() {
p.Conn.Close()
}
// Send 发送一个 StreamData
func (p *StreamConn) Send(data *StreamConn) error {
return p.Enc.Encode(data)
}
// Recv 接收一个 StreamData
func (p *StreamConn) Recv() (*StreamData, error) {
data := &StreamData{}
err := p.Dec.Decode(data)
return data, err
}
// dataGzip 用gzip算法压缩数据data。
func dataGzip(data []byte) ([]byte, error) {
buf := bytes.NewBufferString("")
wr := gzip.NewWriter(buf)
_, err := wr.Write(data)
if err != nil {
return nil, err
}
err = wr.Flush()
if err != nil {
return nil, err
}
err = wr.Close()
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// dataGunzip 用gzip算法解压数据data。
func dataGunzip(data []byte) ([]byte, error) {
buf := bytes.NewBuffer(data)
wr, err := gzip.NewReader(buf)
if err != nil {
return nil, err
}
defer wr.Close()
res := bytes.NewBufferString("")
_, err = io.Copy(res, wr)
if err != nil {
return nil, err
}
return res.Bytes(), nil
}
// SendString 用指定的session发送单独的字符串s,zipped参数的代表是否压缩。
func (p *StreamConn) SendString(session uint32, s string, zipped bool) error {
data := &StreamData{
Typ: TypStream,
Status: "alone",
Session: session,
}
if zipped {
zdata, err := dataGzip([]byte(s))
if err != nil {
return err
}
data.Format = "gz"
data.Data = zdata
} else {
data.Format = "raw"
data.Data = []byte(s)
}
return p.Enc.Encode(data)
}
// SendStringWithExt 用指定的session发送单独的字符串s,zipped参数的代表是否压缩,ext代表程序自定义信息。
func (p *StreamConn) SendStringWithExt(session uint32, s string, zipped bool, ext uint32) error {
data := &StreamData{
Typ: TypStream,
Status: "alone",
Session: session,
Ext: ext,
}
if zipped {
zdata, err := dataGzip([]byte(s))
if err != nil {
return err
}
data.Format = "gz"
data.Data = zdata
} else {
data.Format = "raw"
data.Data = []byte(s)
}
return p.Enc.Encode(data)
}
// SendBytes 用指定的session发送二进制数据buf,zipped参数的代表是否压缩。
func (p *StreamConn) SendBytes(session uint32, buf []byte, zipped bool) error {
data := &StreamData{
Typ: TypStream,
Status: "alone",
Session: session,
}
if zipped {
zdata, err := dataGzip(buf)
if err != nil {
return err
}
data.Format = "gz"
data.Data = zdata
} else {
data.Format = "raw"
data.Data = buf
}
return p.Enc.Encode(data)
}
// SendBytesWithExt 用指定的session发送二进制数据buf,zipped参数的代表是否压缩,ext代表程序自定义信息。
func (p *StreamConn) SendBytesWithExt(session uint32, buf []byte, zipped bool, ext uint32) error {
data := &StreamData{
Typ: TypStream,
Status: "alone",
Session: session,
Ext: ext,
}
if zipped {
zdata, err := dataGzip(buf)
if err != nil {
return err
}
data.Format = "gz"
data.Data = zdata
} else {
data.Format = "raw"
data.Data = buf
}
return p.Enc.Encode(data)
}
// SendBytesHead 用指定的session发送分段二进制数据的头部数据buf,zipped参数的代表是否压缩。
func (p *StreamConn) SendBytesHead(session uint32, buf []byte, zipped bool) error {
data := &StreamData{
Typ: TypStream,
Status: "start",
Session: session,
}
if zipped {
zdata, err := dataGzip(buf)
if err != nil {
return err
}
data.Format = "gz"
data.Data = zdata
} else {
data.Format = "raw"
data.Data = buf
}
return p.Enc.Encode(data)
}
// SendBytesHeadWithExt 用指定的session发送分段二进制数据的头部数据buf,zipped参数的代表是否压缩,ext代表程序自定义信息。
func (p *StreamConn) SendBytesHeadWithExt(session uint32, buf []byte, zipped bool, ext uint32) error {
data := &StreamData{
Typ: TypStream,
Status: "start",
Session: session,
Ext: ext,
}
if zipped {
zdata, err := dataGzip(buf)
if err != nil {
return err
}
data.Format = "gz"
data.Data = zdata
} else {
data.Format = "raw"
data.Data = buf
}
return p.Enc.Encode(data)
}
// SendBytesBody 用指定的session发送分段二进制数据的数据块buf,zipped参数的代表是否压缩。
func (p *StreamConn) SendBytesBody(session uint32, buf []byte, zipped bool) error {
data := &StreamData{
Typ: TypStream,
Status: "",
Session: session,
}
if zipped {
zdata, err := dataGzip(buf)
if err != nil {
return err
}
data.Format = "gz"
data.Data = zdata
} else {
data.Format = "raw"
data.Data = buf
}
return p.Enc.Encode(data)
}
// SendBytesBodyWithExt 用指定的session发送分段二进制数据的数据块buf,zipped参数的代表是否压缩,ext代表程序自定义信息。
func (p *StreamConn) SendBytesBodyWithExt(session uint32, buf []byte, zipped bool, ext uint32) error {
data := &StreamData{
Typ: TypStream,
Status: "",
Session: session,
Ext: ext,
}
if zipped {
zdata, err := dataGzip(buf)
if err != nil {
return err
}
data.Format = "gz"
data.Data = zdata
} else {
data.Format = "raw"
data.Data = buf
}
return p.Enc.Encode(data)
}
// SendBytesEnd 用指定的session发送分段二进制数据的尾部,代表该数据已经传输结束。
func (p *StreamConn) SendBytesEnd(session uint32) error {
data := &StreamData{
Typ: TypStream,
Status: "end",
Session: session,
}
return p.Enc.Encode(data)
}
// SendBytesEndWithExt 用指定的session发送分段二进制数据的尾部,代表该数据已经传输结束,ext代表程序自定义信息。
func (p *StreamConn) SendBytesEndWithExt(session uint32, ext uint32) error {
data := &StreamData{
Typ: TypStream,
Status: "end",
Session: session,
Ext: ext,
}
return p.Enc.Encode(data)
}
// SendFile 用指定的session发送文件pathname,数据将会用gz算法压缩
func (p *StreamConn) SendFile(session uint32, pathname string) error {
name := filepath.Base(pathname)
info, err := os.Stat(pathname)
if err != nil {
return err
}
if info.IsDir() {
return errors.New("SendFile can not send a directory.")
}
fp, err := os.Open(pathname)
if err != nil {
return err
}
defer fp.Close()
header := &StreamData{
Typ: TypFileHead,
Name: name,
Mode: uint32(info.Mode()),
Session: session,
}
err = p.Enc.Encode(header)
if err != nil {
return err
}
data := make([]byte, 4096)
for {
n, err := fp.Read(data)
if err != nil || n == 0 {
break
}
//发送文件数据块
body := &StreamData{
Typ: TypFileData,
Session: session,
}
gzdata, err := dataGzip(data[0:n])
if err != nil {
body.Format = "raw"
body.Data = data[0:n]
} else {
body.Format = "gz"
body.Data = gzdata
}
err = p.Enc.Encode(body)
if err != nil {
return err
}
}
//文件尾
body := &StreamData{
Typ: TypFileData,
Session: session,
Status: "end",
}
return p.Enc.Encode(body)
}
// SendFileWithExt 用指定的session发送文件pathname,数据将会用gz算法压缩,ext代表程序自定义信息
func (p *StreamConn) SendFileWithExt(session uint32, pathname string, ext uint32) error {
name := filepath.Base(pathname)
info, err := os.Stat(pathname)
if err != nil {
return err
}
if info.IsDir() {
return errors.New("SendFile can not send a directory.")
}
fp, err := os.Open(pathname)
if err != nil {
return err
}
defer fp.Close()
header := &StreamData{
Typ: TypFileHead,
Name: name,
Mode: uint32(info.Mode()),
Session: session,
Ext: ext,
}
err = p.Enc.Encode(header)
if err != nil {
return err
}
data := make([]byte, 4096)
for {
n, err := fp.Read(data)
if err != nil || n == 0 {
break
}
//发送文件数据块
body := &StreamData{
Typ: TypFileData,
Session: session,
Ext: ext,
}
gzdata, err := dataGzip(data[0:n])
if err != nil {
body.Format = "raw"
body.Data = data[0:n]
} else {
body.Format = "gz"
body.Data = gzdata
}
err = p.Enc.Encode(body)
if err != nil {
return err
}
}
//文件尾
body := &StreamData{
Typ: TypFileData,
Session: session,
Status: "end",
Ext: ext,
}
return p.Enc.Encode(body)
}
// IsFileHeader 判断一个StreamData是不是文件头。
func (p *StreamData) IsFileHeader() bool {
if p.Typ == TypFileHead {
return true
}
return false
}
// GetFileData 读取一个文件数据快类型的StreamData中的数据,
// 返回值yes-表示是不是一个文件数据段,end-表示传输是否到达尾部(尾部块不包含文件数据),data-表示数据,er-r如果没有错误返回nil
func (p *StreamData) GetFileData() (yes bool, end bool, data []byte, err error) {
if p.Typ != TypFileData {
return false, false, nil, nil
}
if p.Status == "end" {
return true, true, nil, nil
}
if p.Format == "gz" {
data1, err := dataGunzip(p.Data)
if err != nil {
return true, false, nil, err
}
return true, false, data1, nil
}
return true, false, p.Data, nil
}
// GetString 将数据快解码为字符串返回
func (p *StreamData) GetString() (string, error) {
if p.Status == "end" {
return "", nil
}
if p.Format == "gz" {
data1, err := dataGunzip(p.Data)
if err != nil {
return "", err
}
return string(data1), nil
}
return string(p.Data), nil
}
// GetBytes 将数据快解码为[]byte类型返回
func (p *StreamData) GetBytes() ([]byte, error) {
if p.Status == "end" {
return nil, nil
}
if p.Format == "gz" {
data1, err := dataGunzip(p.Data)
if err != nil {
return nil, err
}
return data1, nil
}
return p.Data, nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。