代码拉取完成,页面将自动刷新
/******************************************************************************
* 版权信息:北京人大金仓信息技术股份有限公司
* 作者:KingbaseES
* 文件名:conn_go18.go
* 功能描述:对database/sql相关接口的实现
* 其它说明:
* 修改记录:
1.修改时间:
2.修改人:
3.修改内容:
******************************************************************************/
package gokb
import (
"context"
"database/sql"
"database/sql/driver"
"fmt"
"io"
"io/ioutil"
"time"
"reflect"
)
type converter struct{}
func IsValue(v interface{}) bool {
if v == nil {
return true
}
switch v.(type) {
case int, int64, int32, int16, int8://有符号整型
return true
case uint, uint64, uint32, uint16, uint8://无符号整型
return true
case []uint8://bytea
return true
case float64, float32://浮点型
return true
case bool://布尔类型
return true
case string://字符串类型
return true
case time.Time://时间类型
return true
case CursorString://KB自定义游标类型
return true
case KbNumeric://KB自定义numeric类型
return true
}
return false
}
//实现"CheckNamedValue"接口中的"ConvertValue"函数
func (c converter) ConvertValue(v interface{}) (driver.Value, error) {
//判断是否为支持的in类型参数
if IsValue(v) {
return v, nil
}
//判断是否为out类型参数
var sBind bindStruct
sBind.out, sBind.isOut = v.(sql.Out)
switch sBind.out.Dest.(type) {
case *int, *int64, *int32, *int16, *int8:
return v, nil
case *uint, *uint64, *uint32, *uint16, *uint8:
return v, nil
case *[]uint8:
return v, nil
case *float64, *float32:
return v, nil
case *bool:
return v, nil
case *string:
return v, nil
case *time.Time:
return v, nil
case *CursorString, *KbNumeric:
return v, nil
}
//其它in类型参数
rv := reflect.ValueOf(v)
switch rv.Kind() {
case reflect.Pointer:
// indirect pointers
if rv.IsNil() {
return nil, nil
} else {
return c.ConvertValue(rv.Elem().Interface())
}
case reflect.Slice:
ek := rv.Type().Elem().Kind()
if ek == reflect.Uint8 {
return rv.Bytes(), nil
}
return nil, fmt.Errorf("unsupported type %T, a slice of %s", v, ek)
}
return nil, fmt.Errorf("kb unsupported type %T, a %s", v, rv.Kind())
}
// 实现"CheckNamedValue"接口
func (cn *conn) CheckNamedValue(nv *driver.NamedValue) (err error) {
nv.Value, err = converter{}.ConvertValue(nv.Value)
return
}
// 实现"QueryerContext"接口
func (cn *conn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
var newArgs []driver.Value
query, newArgs, _ = replaceHolderMarkers(query, nil, args)
finish := cn.watchCancel(ctx)
r, err := cn.query(query, newArgs)
if err != nil {
if finish != nil {
finish()
}
return nil, err
}
r.finish = finish
return r, nil
}
func (st *stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error) {
finish := st.cn.watchCancel(ctx)
//if finish := st.cn.watchCancel(ctx); finish != nil {
// defer finish()
//}
var newArgs []driver.Value
st.queryName, newArgs, st.nameList = replaceHolderMarkers(st.queryName, st.nameList, args)
r, err := st.Query(newArgs)
if err != nil {
if finish != nil {
finish()
}
return nil, err
}
return r, err
}
//实现"ExecerContext"接口
func (cn *conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
var newArgs []driver.Value
query, newArgs, _ = replaceHolderMarkers(query, nil, args)
if finish := cn.watchCancel(ctx); finish != nil {
defer finish()
}
return cn.Exec(query, newArgs)
}
func (st *stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) {
var newArgs []driver.Value
st.queryName, newArgs, st.nameList = replaceHolderMarkers(st.queryName, st.nameList, args)
if finish := st.cn.watchCancel(ctx); finish != nil {
defer finish()
}
return st.Exec(newArgs)
}
// 实现"ConnBeginTx"接口
func (cn *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
var mode string
switch sql.IsolationLevel(opts.Isolation) {
case sql.LevelDefault:
// Don't touch mode: use the server's default
case sql.LevelReadUncommitted:
mode = " ISOLATION LEVEL READ UNCOMMITTED"
case sql.LevelReadCommitted:
mode = " ISOLATION LEVEL READ COMMITTED"
case sql.LevelRepeatableRead:
mode = " ISOLATION LEVEL REPEATABLE READ"
case sql.LevelSerializable:
mode = " ISOLATION LEVEL SERIALIZABLE"
default:
return nil, fmt.Errorf("kb: isolation level not supported: %d", opts.Isolation)
}
if opts.ReadOnly {
mode += " READ ONLY"
} else {
mode += " READ WRITE"
}
tx, err := cn.begin(mode)
if err != nil {
return nil, err
}
cn.txnFinish = cn.watchCancel(ctx)
return tx, nil
}
func (cn *conn) Ping(ctx context.Context) error {
if finish := cn.watchCancel(ctx); finish != nil {
defer finish()
}
rows, err := cn.simpleQuery(";")
if err != nil {
return driver.ErrBadConn
}
rows.Close()
return nil
}
func (cn *conn) watchCancel(ctx context.Context) func() {
if done := ctx.Done(); done != nil {
finished := make(chan struct{})
go func() {
select {
case <-done:
// 在此处函数级的上下文被取消,因此它不能用于额外的网络请求来取消查询
// 需要创建一个新的上下文并传给dial
ctxCancel, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
_ = cn.cancel(ctxCancel)
finished <- struct{}{}
case <-finished:
}
}()
return func() {
select {
case <-finished:
case finished <- struct{}{}:
}
}
}
return nil
}
func (cn *conn) cancel(ctx context.Context) error {
c, err := dial(ctx, cn.dialer, cn.opts)
if err != nil {
return err
}
defer c.Close()
{
can := conn{
c: c,
}
err = can.ssl(cn.opts)
if err != nil {
return err
}
w := can.writeBuf(0)
w.int32(80877102) //取消请求代码
w.int32(cn.processID)
w.int32(cn.secretKey)
if err := can.sendStartupPacket(w); err != nil {
return err
}
}
// 读取数据直到读到EOF以确保服务端收到了cancel请求
{
_, err := io.Copy(ioutil.Discard, c)
return err
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。