代码拉取完成,页面将自动刷新
package yasrpc
import (
"bufio"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"net"
"net/http"
"os"
"sync"
"time"
"git.yasdb.com/go/yasrpc/log"
)
// TraClient provides external functions to transfer files
type TraClient struct {
Network string
Address string
TLSConfig *tls.Config
SecretKey string
IsHTTP bool
logger log.Logger
}
// transferClient is a transfer client to transfer files
type transferClient struct {
lock sync.Mutex
conn net.Conn
opt *Option
logger log.Logger
}
func NewTraClient(network, address string, logger log.Logger, tlsConfig *tls.Config, secretKey string, isHTTP bool) *TraClient {
if logger == nil {
logger = log.DefaultLogger
}
return &TraClient{
Network: network,
Address: address,
TLSConfig: tlsConfig,
SecretKey: secretKey,
IsHTTP: isHTTP,
logger: logger,
}
}
func (c *TraClient) UploadFile(ctx context.Context, fileTransferInfo *FileTransferInfo) error {
fileTransfer := &FileTransfer{
FileTransferInfo: *fileTransferInfo,
TransferType: UploadType,
}
tClient, err := transferDial(c.Network, c.Address, c.logger, c.TLSConfig, fileTransfer, c.SecretKey, c.IsHTTP)
if err != nil {
return err
}
defer tClient.Close()
return tClient.uploadFile(ctx)
}
func (c *TraClient) DownloadFile(ctx context.Context, fileTransferInfo *FileTransferInfo) error {
fileTransfer := &FileTransfer{
FileTransferInfo: *fileTransferInfo,
TransferType: DownloadType,
}
tClient, err := transferDial(c.Network, c.Address, c.logger, c.TLSConfig, fileTransfer, c.SecretKey, c.IsHTTP)
if err != nil {
return err
}
defer tClient.Close()
return tClient.downloadFile(ctx)
}
func newTransferClient(conn net.Conn, logger log.Logger, opt *Option) (*transferClient, error) {
fn := NewCodecFuncMap[opt.CodecType]
if fn == nil {
err := fmt.Errorf("invalid codec type %s", opt.CodecType)
logger.Errorf("rpc client: codec err: %v", err)
return nil, err
}
// send options with server
if err := json.NewEncoder(conn).Encode(opt); err != nil {
logger.Errorf("rpc client: options err: %v", err)
_ = conn.Close()
return nil, err
}
if err := json.NewDecoder(conn).Decode(opt); err != nil {
logger.Errorf("rpc client: options err: %v", err)
_ = conn.Close()
return nil, err
}
return &transferClient{
conn: conn,
opt: opt,
logger: logger,
}, nil
}
func newHTTPTransferClient(conn net.Conn, logger log.Logger, opt *Option) (*transferClient, error) {
_, _ = io.WriteString(conn, fmt.Sprintf("CONNECT %s HTTP/1.0\n\n", DefaultRPCPath))
// Require successful HTTP response
// before switching to RPC protocol
resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
if err == nil && resp.Status == ConnectedSuccess {
return newTransferClient(conn, logger, opt)
}
if err == nil {
err = errors.New("unexpected HTTP response: " + resp.Status)
}
return nil, err
}
func transferDial(network, address string, logger log.Logger, tlsConfig *tls.Config, fileTransfer *FileTransfer, secretKey string, isHTTP bool) (*transferClient, error) {
opt, err := genOptions(fileTransfer, secretKey)
if err != nil {
logger.Error(err)
return nil, err
}
var conn net.Conn
var tlsConn *tls.Conn
if tlsConfig != nil {
dialer := &net.Dialer{
Timeout: opt.ConnectTimeout,
}
tlsConn, err = tls.DialWithDialer(dialer, network, address, tlsConfig)
conn = net.Conn(tlsConn)
} else {
conn, err = net.DialTimeout(network, address, opt.ConnectTimeout)
}
if err != nil {
logger.Errorf("failed to dial server: %v\n", err)
return nil, err
}
if isHTTP {
return newHTTPTransferClient(conn, logger, opt)
}
return newTransferClient(conn, logger, opt)
}
func genOptions(fileTransfer *FileTransfer, secretKey string) (*Option, error) {
if fileTransfer == nil {
err := fmt.Errorf("transfer dial error: file transfer is nil")
return nil, err
}
opt := &Option{
VerifiedNumber: VerifiedNumber,
CodecType: GobType,
SecretKey: secretKey,
ConnectTimeout: 10 * time.Second,
FileTransfer: fileTransfer,
}
return opt, nil
}
// Close the connection.
func (c *transferClient) Close() error {
c.lock.Lock()
defer c.lock.Unlock()
return c.conn.Close()
}
// UploadFile upload file to Server.
func (c *transferClient) uploadFile(ctx context.Context) error {
f, err := os.Open(c.opt.FileTransfer.LocalPath)
if err != nil {
c.logger.Errorf("client upload file: open file: %s error %v", c.opt.FileTransfer.LocalPath, err)
return err
}
defer f.Close()
fi, err := os.Stat(c.opt.FileTransfer.LocalPath)
if err != nil {
c.logger.Errorf("client upload file: stat file: %s error %v", c.opt.FileTransfer.LocalPath, err)
return err
}
c.logger.Infof("upload file: %s, size: %d", fi.Name(), fi.Size())
traErr := &TransferError{}
if err := json.NewEncoder(c.conn).Encode(traErr); err != nil {
c.logger.Errorf("rpc client: options err: %v", err)
return err
}
if err := json.NewDecoder(c.conn).Decode(traErr); err != nil {
c.logger.Errorf("rpc client: options err: %v", err)
return err
}
if traErr.Error != "" {
err := fmt.Errorf("tarsfer error: %s", traErr.Error)
c.logger.Error(err)
return err
}
buf := make([]byte, c.opt.FileTransfer.BlockSize)
c.logger.Debugf("upload file block size: %d", c.opt.FileTransfer.BlockSize)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
n, err := f.Read(buf)
if err != nil {
if err == io.EOF {
c.logger.Info("client upload file: send file end")
return nil
}
c.logger.Errorf("client upload file: file read err: %v", err)
return err
}
_, err = c.conn.Write(buf[:n])
if err != nil {
c.logger.Errorf("client upload file: conn write err: %v", err)
return err
}
}
}
}
// DownloadFile download file from Server.
func (c *transferClient) downloadFile(ctx context.Context) error {
f, err := os.OpenFile(c.opt.FileTransfer.LocalPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, fs.FileMode(c.opt.FileTransfer.LocalMode))
if err != nil {
c.logger.Errorf("client download file: open file: %s error %v", c.opt.FileTransfer.LocalPath, err)
return err
}
defer f.Close()
traErr := &TransferError{}
if err := json.NewEncoder(c.conn).Encode(traErr); err != nil {
c.logger.Errorf("rpc client: options err: %v", err)
return err
}
if err := json.NewDecoder(c.conn).Decode(traErr); err != nil {
c.logger.Errorf("rpc client: options err: %v", err)
return err
}
if err := json.NewEncoder(c.conn).Encode(traErr); err != nil {
c.logger.Errorf("rpc client: options err: %v", err)
return err
}
if traErr.Error != "" {
err := fmt.Errorf("transfer error: %s", traErr.Error)
c.logger.Error(err)
return err
}
buf := make([]byte, c.opt.FileTransfer.BlockSize)
fmt.Println("download file block size:", c.opt.FileTransfer.BlockSize)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
n, err := c.conn.Read(buf)
if err != nil {
if err == io.EOF {
c.logger.Info("client download file: upload EOF end")
return nil
}
c.logger.Errorf("client download file: conn read err: %v", err)
return err
}
_, err = f.Write(buf[:n])
if err != nil {
c.logger.Errorf("write file err: %v", err)
return err
}
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。