Ai
7 Star 6 Fork 2

bandl/wheatClient

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
maxFile.go 5.65 KB
一键复制 编辑 原始数据 按行查看 历史
bandl 提交于 2021-05-22 17:15 +08:00 . 完成多线程大文件流下载
package wheatClient
import (
"errors"
"fmt"
"github.com/timedb/wheatDFS/app"
"github.com/timedb/wheatDFS/etc"
"github.com/timedb/wheatDFS/fileKeyTorch"
"io"
)
var (
HashErr = errors.New("the given hash error")
TokenErr = errors.New("token")
MaxFileTransErr = errors.New("this method only handles large files. For small files, use the Client interface")
UpLoadStateErr = errors.New("this token is not an upload type token")
DownloadStateErr = errors.New("this token is not an download type token")
SwitchErr = errors.New("only upload mode can be switched to download mode")
)
const (
download = 1
upload = 2
workEnd = 3
defaultOffset = -9
//最大下载协程数
proto = 6
)
// FileTransferManager 大文件文件上传器
type FileTransferManager struct {
offset int64
Hash string
Token string
state int
Ext string
host *etc.Addr //服务地址
//上传用结构体
uploadReq *app.StoUploadMaxFileReq
uploadResp *app.StoUploadMaxFileResp
//下载用结构体
downloadResp *app.StoGetMaxFileResp
downloadReq *app.StoGetMaxFileReq
}
//开始传输
func (f *FileTransferManager) startUpload() error {
req := app.MakeStoUploadMaxFileReq()
resp := new(app.StoUploadMaxFileResp)
req.TransferStatus = app.UpStart
req.Hash = f.Hash
req.Ext = f.Ext
//请求
err := req.Do(f.host, resp)
if err != nil {
return err
}
if !resp.Successful() {
return errors.New(resp.Err)
}
f.offset = resp.Offset //修改偏移
f.uploadReq = req
f.uploadResp = resp
return nil
}
// Upload 大文件上传接口
func (f *FileTransferManager) Upload(buf []byte) error {
if f.state != upload {
return UpLoadStateErr
}
if f.offset == defaultOffset {
err := f.startUpload()
if err != nil {
return err
}
}
//构建传输
f.uploadReq.Offset = f.offset
f.uploadReq.TransferStatus = app.UpSustain
f.uploadReq.Content = buf
err := f.uploadReq.Do(f.host, f.uploadResp)
if err != nil {
return err
}
if !f.uploadResp.Successful() {
return errors.New(f.uploadResp.Err)
}
f.offset = f.uploadResp.Offset //修改偏移
return nil
}
// UploadEnd 传输完成
func (f *FileTransferManager) UploadEnd() (string, error) {
if f.state != upload {
return "", UpLoadStateErr
}
f.uploadReq.TransferStatus = app.UpEnd
err := f.uploadReq.Do(f.host, f.uploadResp)
if err != nil {
return "", err
}
if !f.uploadResp.Successful() {
return "", errors.New(f.uploadResp.Err)
}
f.state = workEnd
return f.uploadResp.Token, nil
}
func (f *FileTransferManager) DownLoad() ([]byte, error) {
if f.state != download {
return nil, DownloadStateErr
}
// 下载
f.downloadReq.Offset = f.offset
err := f.downloadReq.Do(f.host, f.downloadResp)
if f.downloadReq.Offset == -1 {
f.state = workEnd
return nil, io.EOF
}
if err != nil {
return nil, err
}
if !f.downloadResp.Successful() {
return nil, errors.New(f.uploadResp.Err)
}
f.offset = f.downloadResp.Offset
return f.downloadResp.Content, err
}
// SwitchToDownloadMode 切换到下载模式
func (f *FileTransferManager) SwitchToDownloadMode() error {
if f.state != upload {
return SwitchErr
}
f.state = download
f.downloadResp = new(app.StoGetMaxFileResp)
f.downloadReq = app.MakeGetMaxFile(f.Token)
f.offset = 0
f.uploadReq = nil
f.uploadResp = nil
host, err := sysClient.getEsoOkHost(f.Token)
if err != nil {
return err
}
f.host = host
return nil
}
// MakeFileTransferManagerByHash 创建文件令牌, 使用hash
func MakeFileTransferManagerByHash(Hash string, Ext string) (*FileTransferManager, error) {
fk := fileKeyTorch.MakeFileKeyByHash(Hash, Ext)
if fk == nil {
return nil, HashErr
}
if fk.Types() != fileKeyTorch.MaxFile {
return nil, MaxFileTransErr
}
host, err := sysClient.GetStorageAddr()
if err != nil {
return nil, err
}
file := new(FileTransferManager)
file.Token = fk.GeyToken()
file.Hash = Hash
file.offset = defaultOffset
file.state = upload
file.Ext = fk.Ext()
file.host = host
file.uploadReq = new(app.StoUploadMaxFileReq)
file.uploadResp = new(app.StoUploadMaxFileResp)
return file, nil
}
// MakeFileTransferManagerByToken 创建下载令牌使用token
func MakeFileTransferManagerByToken(token string) (*FileTransferManager, error) {
fk := fileKeyTorch.MakeFileKeyByToken(token)
if fk == nil {
return nil, TokenErr
}
file := new(FileTransferManager)
file.Token = fk.GeyToken()
host, err := sysClient.getEsoOkHost(file.Token)
if err != nil {
return nil, err
}
file.offset = 0
file.state = download
file.Ext = fk.Ext()
file.host = host
file.downloadReq = app.MakeGetMaxFile(token)
file.downloadResp = new(app.StoGetMaxFileResp)
return file, nil
}
// downProto 高速下载器
type downProto struct {
buf chan []byte //高速下载器
ofCont int64
offset int64
token string
req *app.StoGetMaxFileReq
resp *app.StoGetMaxFileResp
_close bool
}
// 获取管道数据
func (d *downProto) getBuf() ([]byte, bool) {
buf, ok := <-d.buf
return buf, ok
}
// 开始下载
func (d *downProto) start(initOffice int64, token string, host *etc.Addr) {
//开启下载协程
d.req = app.MakeGetMaxFile(d.token)
d.resp = new(app.StoGetMaxFileResp)
d.buf = make(chan []byte, 5)
d.offset = initOffice
d.token = token
d.ofCont = proto
d._close = false
d.req.Token = d.token
go func() {
for true {
d.req.Offset = d.offset //更新变化
err := d.req.Do(host, d.resp)
if err != nil { //错误关闭管道
close(d.buf)
fmt.Println(1, err)
return
}
if !d.resp.Successful() {
close(d.buf)
return
}
if d.resp.Offset == -1 { //结束
close(d.buf)
return
}
d.buf <- d.resp.Content
d.offset += d.ofCont //更新偏移
}
}()
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/timedb/wheatClient.git
git@gitee.com:timedb/wheatClient.git
timedb
wheatClient
wheatClient
2.1.1

搜索帮助