1 Star 0 Fork 0

github_repo/trino-golang-client

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
trino.go 44.47 KB
一键复制 编辑 原始数据 按行查看 历史
painsOnline 提交于 2022-01-06 17:51 +08:00 . Add files via upload
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file contains code that was borrowed from prestgo, mainly some
// data type definitions.
//
// See https://github.com/avct/prestgo for copyright information.
//
// The MIT License (MIT)
//
// Copyright (c) 2015 Avocet Systems Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
// Package trino provides a database/sql driver for Trino.
//
// The driver should be used via the database/sql package:
//
// import "database/sql"
// import _ "github.com/trinodb/trino-go-client/trino"
//
// dsn := "http://user@localhost:8080?catalog=default&schema=test"
// db, err := sql.Open("trino", dsn)
//
package trino
import (
"context"
"crypto/tls"
"crypto/x509"
"database/sql"
"database/sql/driver"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"math"
"net/http"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
"time"
"unicode"
"gopkg.in/jcmturner/gokrb5.v6/client"
"gopkg.in/jcmturner/gokrb5.v6/config"
"gopkg.in/jcmturner/gokrb5.v6/keytab"
)
func init() {
sql.Register("trino", &sqldriver{})
}
var (
// DefaultQueryTimeout is the default timeout for queries executed without a context.
DefaultQueryTimeout = 60 * time.Second
// DefaultCancelQueryTimeout is the timeout for the request to cancel queries in Trino.
DefaultCancelQueryTimeout = 30 * time.Second
// ErrOperationNotSupported indicates that a database operation is not supported.
ErrOperationNotSupported = errors.New("trino: operation not supported")
// ErrQueryCancelled indicates that a query has been cancelled.
ErrQueryCancelled = errors.New("trino: query cancelled")
// ErrUnsupportedHeader indicates that the server response contains an unsupported header.
ErrUnsupportedHeader = errors.New("trino: server response contains an unsupported header")
)
const (
preparedStatementHeader = "X-Trino-Prepared-Statement"
preparedStatementName = "_trino_go"
trinoUserHeader = "X-Trino-User"
trinoSourceHeader = "X-Trino-Source"
trinoCatalogHeader = "X-Trino-Catalog"
trinoSchemaHeader = "X-Trino-Schema"
trinoSessionHeader = "X-Trino-Session"
trinoSetCatalogHeader = "X-Trino-Set-Catalog"
trinoSetSchemaHeader = "X-Trino-Set-Schema"
trinoSetPathHeader = "X-Trino-Set-Path"
trinoSetSessionHeader = "X-Trino-Set-Session"
trinoClearSessionHeader = "X-Trino-Clear-Session"
trinoSetRoleHeader = "X-Trino-Set-Role"
trinoTransactionHeader = "X-Trino-Transaction-Id"
KerberosEnabledConfig = "KerberosEnabled"
kerberosKeytabPathConfig = "KerberosKeytabPath"
kerberosPrincipalConfig = "KerberosPrincipal"
kerberosRealmConfig = "KerberosRealm"
kerberosConfigPathConfig = "KerberosConfigPath"
SSLCertPathConfig = "SSLCertPath"
)
var (
responseToRequestHeaderMap = map[string]string{
trinoSetSchemaHeader: trinoSchemaHeader,
trinoSetCatalogHeader: trinoCatalogHeader,
}
unsupportedResponseHeaders = []string{
trinoSetPathHeader,
trinoSetRoleHeader,
}
)
type sqldriver struct{}
func (d *sqldriver) Open(name string) (driver.Conn, error) {
return newConn(name)
}
var _ driver.Driver = &sqldriver{}
// Config is a configuration that can be encoded to a DSN string.
type Config struct {
ServerURI string // URI of the Trino server, e.g. http://user@localhost:8080
Source string // Source of the connection (optional)
Catalog string // Catalog (optional)
Schema string // Schema (optional)
SessionProperties map[string]string // Session properties (optional)
CustomClientName string // Custom client name (optional)
KerberosEnabled string // KerberosEnabled (optional, default is false)
KerberosKeytabPath string // Kerberos Keytab Path (optional)
KerberosPrincipal string // Kerberos Principal used to authenticate to KDC (optional)
KerberosRealm string // The Kerberos Realm (optional)
KerberosConfigPath string // The krb5 config path (optional)
SSLCertPath string // The SSL cert path for TLS verification (optional)
}
// FormatDSN returns a DSN string from the configuration.
func (c *Config) FormatDSN() (string, error) {
serverURL, err := url.Parse(c.ServerURI)
if err != nil {
return "", err
}
var sessionkv []string
if c.SessionProperties != nil {
for k, v := range c.SessionProperties {
sessionkv = append(sessionkv, k+"="+v)
}
}
source := c.Source
if source == "" {
source = "trino-go-client"
}
query := make(url.Values)
query.Add("source", source)
KerberosEnabled, _ := strconv.ParseBool(c.KerberosEnabled)
isSSL := serverURL.Scheme == "https"
if isSSL && c.SSLCertPath != "" {
query.Add(SSLCertPathConfig, c.SSLCertPath)
}
if KerberosEnabled {
query.Add(KerberosEnabledConfig, "true")
query.Add(kerberosKeytabPathConfig, c.KerberosKeytabPath)
query.Add(kerberosPrincipalConfig, c.KerberosPrincipal)
query.Add(kerberosRealmConfig, c.KerberosRealm)
query.Add(kerberosConfigPathConfig, c.KerberosConfigPath)
if !isSSL {
return "", fmt.Errorf("trino: client configuration error, SSL must be enabled for secure env")
}
}
for k, v := range map[string]string{
"catalog": c.Catalog,
"schema": c.Schema,
"session_properties": strings.Join(sessionkv, ","),
"custom_client": c.CustomClientName,
} {
if v != "" {
query[k] = []string{v}
}
}
serverURL.RawQuery = query.Encode()
return serverURL.String(), nil
}
// Conn is a Trino connection.
type Conn struct {
baseURL string
auth *url.Userinfo
httpClient http.Client
clientSession *ClientSession
kerberosClient client.Client
kerberosEnabled bool
}
var (
_ driver.Conn = &Conn{}
_ driver.ConnPrepareContext = &Conn{}
)
type ClientSession struct {
Catalog string
Schema string
Source string
User string
TransactionId string
Properties map[string]string
}
func (c *ClientSession) SetHttpHeader(header string, value string) error {
switch header {
case trinoCatalogHeader:
c.Catalog = value
case trinoSchemaHeader:
c.Schema = value
case trinoSourceHeader:
c.Source = value
case trinoUserHeader:
c.User = value
case trinoTransactionHeader:
c.TransactionId = value
default:
return errors.New("Unsupport header!")
}
return nil
}
func (c *ClientSession) GetHttpHeaders() http.Header {
httpHeaders := make(http.Header)
for k, v := range map[string]string{
trinoUserHeader: c.User,
trinoSourceHeader: c.Source,
trinoCatalogHeader: c.Catalog,
trinoSchemaHeader: c.Schema,
} {
if v != "" {
httpHeaders.Add(k, v)
}
}
// 获取http header 请通过该方法,该方法将SessionProperties加入到X-Trino-Sesssion中
// 将 SessionProperties 加入到 X-Trino-Sesssion中
var sessionkv []string
for k, v := range c.Properties {
sessionkv = append(sessionkv, k+"="+v)
}
httpHeaders.Add(trinoSessionHeader, strings.Join(sessionkv, ","))
httpHeaders.Add(trinoTransactionHeader, c.TransactionId)
return httpHeaders
}
func newConn(dsn string) (*Conn, error) {
serverURL, err := url.Parse(dsn)
if err != nil {
return nil, fmt.Errorf("trino: malformed dsn: %v", err)
}
query := serverURL.Query()
kerberosEnabled, _ := strconv.ParseBool(query.Get(KerberosEnabledConfig))
var kerberosClient client.Client
if kerberosEnabled {
kt, err := keytab.Load(query.Get(kerberosKeytabPathConfig))
if err != nil {
return nil, fmt.Errorf("trino: Error loading Keytab: %v", err)
}
kerberosClient = client.NewClientWithKeytab(query.Get(kerberosPrincipalConfig), query.Get(kerberosRealmConfig), kt)
conf, err := config.Load(query.Get(kerberosConfigPathConfig))
if err != nil {
return nil, fmt.Errorf("trino: Error loading krb config: %v", err)
}
kerberosClient.WithConfig(conf)
loginErr := kerberosClient.Login()
if loginErr != nil {
return nil, fmt.Errorf("trino: Error login to KDC: %v", loginErr)
}
}
var httpClient = http.DefaultClient
if clientKey := query.Get("custom_client"); clientKey != "" {
httpClient = getCustomClient(clientKey)
if httpClient == nil {
return nil, fmt.Errorf("trino: custom client not registered: %q", clientKey)
}
} else if certPath := query.Get(SSLCertPathConfig); certPath != "" && serverURL.Scheme == "https" {
cert, err := ioutil.ReadFile(certPath)
if err != nil {
return nil, fmt.Errorf("trino: Error loading SSL Cert File: %v", err)
}
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(cert)
httpClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: certPool,
},
},
}
}
c := &Conn{
baseURL: serverURL.Scheme + "://" + serverURL.Host,
httpClient: *httpClient,
kerberosClient: kerberosClient,
kerberosEnabled: kerberosEnabled,
}
var user string
if serverURL.User != nil {
user = serverURL.User.Username()
pass, _ := serverURL.User.Password()
if pass != "" && serverURL.Scheme == "https" {
c.auth = serverURL.User
}
}
clientSession := ClientSession{
Catalog: query.Get("catalog"),
Schema: query.Get("schema"),
User: user,
Source: query.Get("source"),
TransactionId: query.Get("transaction_id"),
Properties: c.getHeaderPropertyKV(query.Get("session_properties")),
}
c.clientSession = &clientSession
return c, nil
}
// registry for custom http clients
var customClientRegistry = struct {
sync.RWMutex
Index map[string]http.Client
}{
Index: make(map[string]http.Client),
}
// RegisterCustomClient associates a client to a key in the driver's registry.
//
// Register your custom client in the driver, then refer to it by name in the DSN, on the call to sql.Open:
//
// foobarClient := &http.Client{
// Transport: &http.Transport{
// Proxy: http.ProxyFromEnvironment,
// DialContext: (&net.Dialer{
// Timeout: 30 * time.Second,
// KeepAlive: 30 * time.Second,
// DualStack: true,
// }).DialContext,
// MaxIdleConns: 100,
// IdleConnTimeout: 90 * time.Second,
// TLSHandshakeTimeout: 10 * time.Second,
// ExpectContinueTimeout: 1 * time.Second,
// TLSClientConfig: &tls.Config{
// // your config here...
// },
// },
// }
// trino.RegisterCustomClient("foobar", foobarClient)
// db, err := sql.Open("trino", "https://user@localhost:8080?custom_client=foobar")
//
func RegisterCustomClient(key string, client *http.Client) error {
if _, err := strconv.ParseBool(key); err == nil {
return fmt.Errorf("trino: custom client key %q is reserved", key)
}
customClientRegistry.Lock()
customClientRegistry.Index[key] = *client
customClientRegistry.Unlock()
return nil
}
// DeregisterCustomClient removes the client associated to the key.
func DeregisterCustomClient(key string) {
customClientRegistry.Lock()
delete(customClientRegistry.Index, key)
customClientRegistry.Unlock()
}
func getCustomClient(key string) *http.Client {
customClientRegistry.RLock()
defer customClientRegistry.RUnlock()
if client, ok := customClientRegistry.Index[key]; ok {
return &client
}
return nil
}
// Begin implements the driver.Conn interface.
func (c *Conn) Begin() (driver.Tx, error) {
return nil, ErrOperationNotSupported
}
// Prepare implements the driver.Conn interface.
func (c *Conn) Prepare(query string) (driver.Stmt, error) {
return nil, driver.ErrSkip
}
// PrepareContext implements the driver.ConnPrepareContext interface.
func (c *Conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
return &driverStmt{conn: c, query: query}, nil
}
// Close implements the driver.Conn interface.
func (c *Conn) Close() error {
return nil
}
func (c *Conn) newRequest(method, url string, body io.Reader, hs http.Header) (*http.Request, error) {
req, err := http.NewRequest(method, url, body)
if err != nil {
return nil, fmt.Errorf("trino: %v", err)
}
if c.kerberosEnabled {
err = c.kerberosClient.SetSPNEGOHeader(req, "trino/"+req.URL.Hostname())
if err != nil {
return nil, fmt.Errorf("error setting client SPNEGO header: %v", err)
}
}
for k, v := range c.clientSession.GetHttpHeaders() {
req.Header[k] = v
}
for k, v := range hs {
req.Header[k] = v
}
if c.auth != nil {
pass, _ := c.auth.Password()
req.SetBasicAuth(c.auth.Username(), pass)
}
return req, nil
}
func (c *Conn) getHeaderValues(headerValueStrng string) []string {
return strings.Split(headerValueStrng, ",")
}
func (c *Conn) getHeaderPropertyKV(headerValueStrng string) map[string]string {
headerPropertyKV := make(map[string]string)
kvs := c.getHeaderValues(headerValueStrng)
for _, item := range kvs {
kv := strings.Split(item, "=")
if len(kv) < 2 {
continue
}
headerPropertyKV[strings.Trim(kv[0], " ")] = strings.Trim(kv[1], " ")
}
return headerPropertyKV
}
func (c *Conn) roundTrip(ctx context.Context, req *http.Request) (*http.Response, error) {
delay := 100 * time.Millisecond
const maxDelayBetweenRequests = float64(15 * time.Second)
timer := time.NewTimer(0)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-timer.C:
timeout := DefaultQueryTimeout
if deadline, ok := ctx.Deadline(); ok {
timeout = time.Until(deadline)
}
client := c.httpClient
client.Timeout = timeout
req.Cancel = ctx.Done()
resp, err := client.Do(req)
if err != nil {
return nil, &ErrQueryFailed{Reason: err}
}
switch resp.StatusCode {
case http.StatusOK:
for src, dst := range responseToRequestHeaderMap {
if v := resp.Header.Get(src); v != "" {
if c.clientSession.SetHttpHeader(dst, v) != nil {
return nil, ErrUnsupportedHeader
}
}
}
for _, name := range unsupportedResponseHeaders {
if v := resp.Header.Get(name); v != "" {
return nil, ErrUnsupportedHeader
}
}
//如果响应结果有X-Trino-Set-Session相关header则需要存储值header中在下次请求加上
if v := resp.Header.Get(trinoSetSessionHeader); v != "" {
for headerKey, heaverVal := range c.getHeaderPropertyKV(v) {
c.clientSession.Properties[headerKey] = heaverVal
}
}
// 如果响应结果有X-Trino-Clear-Session相关header则需要将存储值从header中取出
if v := resp.Header.Get(trinoClearSessionHeader); v != "" {
for headerKey, _ := range c.getHeaderPropertyKV(v) {
if _, ok := c.clientSession.Properties[headerKey]; ok {
delete(c.clientSession.Properties, headerKey)
}
}
}
return resp, nil
case http.StatusServiceUnavailable:
resp.Body.Close()
timer.Reset(delay)
delay = time.Duration(math.Min(
float64(delay)*math.Phi,
maxDelayBetweenRequests,
))
continue
default:
return nil, newErrQueryFailedFromResponse(resp)
}
}
}
}
// ErrQueryFailed indicates that a query to Trino failed.
type ErrQueryFailed struct {
StatusCode int
Reason error
}
// Error implements the error interface.
func (e *ErrQueryFailed) Error() string {
return fmt.Sprintf("trino: query failed (%d %s): %q",
e.StatusCode, http.StatusText(e.StatusCode), e.Reason)
}
func newErrQueryFailedFromResponse(resp *http.Response) *ErrQueryFailed {
const maxBytes = 8 * 1024
defer resp.Body.Close()
qf := &ErrQueryFailed{StatusCode: resp.StatusCode}
b, err := ioutil.ReadAll(io.LimitReader(resp.Body, maxBytes))
if err != nil {
qf.Reason = err
return qf
}
reason := string(b)
if resp.ContentLength > maxBytes {
reason += "..."
}
qf.Reason = errors.New(reason)
return qf
}
type driverStmt struct {
conn *Conn
query string
user string
}
var (
_ driver.Stmt = &driverStmt{}
_ driver.StmtQueryContext = &driverStmt{}
_ driver.StmtExecContext = &driverStmt{}
)
func (st *driverStmt) Close() error {
return nil
}
func (st *driverStmt) NumInput() int {
return -1
}
func (st *driverStmt) Exec(args []driver.Value) (driver.Result, error) {
return nil, driver.ErrSkip
}
func (st *driverStmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) {
sr, err := st.exec(ctx, args)
if err != nil {
return nil, err
}
rows := &driverRows{
ctx: ctx,
stmt: st,
queryID: sr.ID,
nextURI: sr.NextURI,
rowsAffected: sr.UpdateCount,
}
// consume all results, if there are any
for err == nil {
err = rows.fetch(true)
}
if err != nil && err != io.EOF {
return nil, err
}
return rows, nil
}
type stmtResponse struct {
ID string `json:"id"`
InfoURI string `json:"infoUri"`
NextURI string `json:"nextUri"`
Stats stmtStats `json:"stats"`
Error stmtError `json:"error"`
UpdateType string `json:"updateType"`
UpdateCount int64 `json:"updateCount"`
}
type stmtStats struct {
State string `json:"state"`
Scheduled bool `json:"scheduled"`
Nodes int `json:"nodes"`
TotalSplits int `json:"totalSplits"`
QueuesSplits int `json:"queuedSplits"`
RunningSplits int `json:"runningSplits"`
CompletedSplits int `json:"completedSplits"`
UserTimeMillis int `json:"userTimeMillis"`
CPUTimeMillis int `json:"cpuTimeMillis"`
WallTimeMillis int `json:"wallTimeMillis"`
ProcessedRows int `json:"processedRows"`
ProcessedBytes int `json:"processedBytes"`
RootStage stmtStage `json:"rootStage"`
}
type stmtError struct {
Message string `json:"message"`
ErrorName string `json:"errorName"`
ErrorCode int `json:"errorCode"`
ErrorLocation stmtErrorLocation `json:"errorLocation"`
FailureInfo stmtErrorFailureInfo `json:"failureInfo"`
// Other fields omitted
}
type stmtErrorLocation struct {
LineNumber int `json:"lineNumber"`
ColumnNumber int `json:"columnNumber"`
}
type stmtErrorFailureInfo struct {
Type string `json:"type"`
// Other fields omitted
}
func (e stmtError) Error() string {
return e.FailureInfo.Type + ": " + e.Message
}
type stmtStage struct {
StageID string `json:"stageId"`
State string `json:"state"`
Done bool `json:"done"`
Nodes int `json:"nodes"`
TotalSplits int `json:"totalSplits"`
QueuedSplits int `json:"queuedSplits"`
RunningSplits int `json:"runningSplits"`
CompletedSplits int `json:"completedSplits"`
UserTimeMillis int `json:"userTimeMillis"`
CPUTimeMillis int `json:"cpuTimeMillis"`
WallTimeMillis int `json:"wallTimeMillis"`
ProcessedRows int `json:"processedRows"`
ProcessedBytes int `json:"processedBytes"`
SubStages []stmtStage `json:"subStages"`
}
func (st *driverStmt) Query(args []driver.Value) (driver.Rows, error) {
return nil, driver.ErrSkip
}
func (st *driverStmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error) {
sr, err := st.exec(ctx, args)
if err != nil {
return nil, err
}
rows := &driverRows{
ctx: ctx,
stmt: st,
queryID: sr.ID,
nextURI: sr.NextURI,
}
if err = rows.fetch(false); err != nil {
return nil, err
}
return rows, nil
}
func (st *driverStmt) exec(ctx context.Context, args []driver.NamedValue) (*stmtResponse, error) {
query := st.query
var hs http.Header
if len(args) > 0 {
hs = make(http.Header)
var ss []string
for _, arg := range args {
s, err := Serial(arg.Value)
if err != nil {
return nil, err
}
if arg.Name == trinoUserHeader {
st.user = arg.Value.(string)
hs.Add(trinoUserHeader, st.user)
} else {
if hs.Get(preparedStatementHeader) == "" {
hs.Add(preparedStatementHeader, preparedStatementName+"="+url.QueryEscape(st.query))
}
ss = append(ss, s)
}
}
if len(ss) > 0 {
query = "EXECUTE " + preparedStatementName + " USING " + strings.Join(ss, ", ")
}
}
//需要将每次resp中的header存储起来,下次请求的时候携带
req, err := st.conn.newRequest("POST", st.conn.baseURL+"/v1/statement", strings.NewReader(query), hs)
if err != nil {
return nil, err
}
resp, err := st.conn.roundTrip(ctx, req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var sr stmtResponse
d := json.NewDecoder(resp.Body)
d.UseNumber()
err = d.Decode(&sr)
if err != nil {
return nil, fmt.Errorf("trino: %v", err)
}
return &sr, handleResponseError(resp.StatusCode, sr.Error)
}
type driverRows struct {
ctx context.Context
stmt *driverStmt
queryID string
nextURI string
finished bool
err error
rowindex int
columns []string
coltype []*typeConverter
data []queryData
rowsAffected int64
}
var _ driver.Rows = &driverRows{finished: false}
var _ driver.Result = &driverRows{}
// Close closes the rows iterator.
func (qr *driverRows) Close() error {
if qr.err == sql.ErrNoRows || qr.err == io.EOF || !qr.finished {
return nil
}
qr.err = io.EOF
hs := make(http.Header)
if qr.stmt.user != "" {
hs.Add(trinoUserHeader, qr.stmt.user)
}
req, err := qr.stmt.conn.newRequest("DELETE", qr.stmt.conn.baseURL+"/v1/query/"+url.PathEscape(qr.queryID), nil, hs)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), DefaultCancelQueryTimeout)
defer cancel()
resp, err := qr.stmt.conn.roundTrip(ctx, req)
if err != nil {
qferr, ok := err.(*ErrQueryFailed)
if ok && qferr.StatusCode == http.StatusNoContent {
qr.nextURI = ""
return nil
}
return err
}
resp.Body.Close()
return qr.err
}
// Columns returns the names of the columns.
func (qr *driverRows) Columns() []string {
if qr.err != nil {
return []string{}
}
if qr.columns == nil {
if err := qr.fetch(false); err != nil {
qr.err = err
return []string{}
}
}
return qr.columns
}
var coltypeLengthSuffix = regexp.MustCompile(`\(\d+\)$`)
func (qr *driverRows) ColumnTypeDatabaseTypeName(index int) string {
name := qr.coltype[index].typeName
if m := coltypeLengthSuffix.FindStringSubmatch(name); m != nil {
name = name[0 : len(name)-len(m[0])]
}
return name
}
// Next is called to populate the next row of data into
// the provided slice. The provided slice will be the same
// size as the Columns() are wide.
//
// Next should return io.EOF when there are no more rows.
func (qr *driverRows) Next(dest []driver.Value) error {
if qr.err != nil {
return qr.err
}
if qr.columns == nil || qr.rowindex >= len(qr.data) {
if qr.nextURI == "" {
qr.finished = true
qr.err = io.EOF
return qr.err
}
if err := qr.fetch(true); err != nil {
qr.err = err
return err
}
}
if len(qr.coltype) == 0 {
qr.err = sql.ErrNoRows
return qr.err
}
for i, v := range qr.coltype {
vv, err := v.ConvertValue(qr.data[qr.rowindex][i])
if err != nil {
qr.err = err
return err
}
dest[i] = vv
}
qr.rowindex++
return nil
}
// LastInsertId returns the database's auto-generated ID
// after, for example, an INSERT into a table with primary
// key.
func (qr driverRows) LastInsertId() (int64, error) {
return 0, ErrOperationNotSupported
}
// RowsAffected returns the number of rows affected by the query.
func (qr driverRows) RowsAffected() (int64, error) {
return qr.rowsAffected, qr.err
}
type queryResponse struct {
ID string `json:"id"`
InfoURI string `json:"infoUri"`
PartialCancelURI string `json:"partialCancelUri"`
NextURI string `json:"nextUri"`
Columns []queryColumn `json:"columns"`
Data []queryData `json:"data"`
Stats stmtStats `json:"stats"`
Error stmtError `json:"error"`
UpdateType string `json:"updateType"`
UpdateCount int64 `json:"updateCount"`
}
type queryColumn struct {
Name string `json:"name"`
Type string `json:"type"`
TypeSignature typeSignature `json:"typeSignature"`
}
type queryData []interface{}
type typeSignature struct {
RawType string `json:"rawType"`
TypeArguments []interface{} `json:"typeArguments"`
LiteralArguments []interface{} `json:"literalArguments"`
}
func handleResponseError(status int, respErr stmtError) error {
switch respErr.ErrorName {
case "":
return nil
case "USER_CANCELLED":
return ErrQueryCancelled
default:
return &ErrQueryFailed{
StatusCode: status,
Reason: &respErr,
}
}
}
func (qr *driverRows) fetch(allowEOF bool) error {
for {
loop, err := qr.fetchX(allowEOF)
if err != nil {
return err
}
if !loop {
break
}
}
return nil
}
func (qr *driverRows) fetchX(allowEOF bool) (bool, error) {
if qr.nextURI == "" {
qr.finished = true
if allowEOF {
return false, io.EOF
}
return false, nil
}
hs := make(http.Header)
hs.Add(trinoUserHeader, qr.stmt.user)
req, err := qr.stmt.conn.newRequest("GET", qr.nextURI, nil, hs)
if err != nil {
return false, err
}
resp, err := qr.stmt.conn.roundTrip(qr.ctx, req)
if err != nil {
if qr.ctx.Err() == context.Canceled {
qr.Close()
return false, err
}
return false, err
}
defer resp.Body.Close()
var qresp queryResponse
d := json.NewDecoder(resp.Body)
d.UseNumber()
err = d.Decode(&qresp)
if err != nil {
return false, fmt.Errorf("trino: %v", err)
}
err = handleResponseError(resp.StatusCode, qresp.Error)
if err != nil {
return false, err
}
qr.rowindex = 0
qr.data = qresp.Data
qr.nextURI = qresp.NextURI
if len(qr.data) == 0 {
if qr.nextURI != "" {
return true, nil
}
if allowEOF {
qr.err = io.EOF
return false, qr.err
}
}
if qr.columns == nil && len(qresp.Columns) > 0 {
qr.initColumns(&qresp)
}
qr.rowsAffected = qresp.UpdateCount
return false, nil
}
func (qr *driverRows) initColumns(qresp *queryResponse) {
qr.columns = make([]string, len(qresp.Columns))
qr.coltype = make([]*typeConverter, len(qresp.Columns))
for i, col := range qresp.Columns {
qr.columns[i] = col.Name
qr.coltype[i] = newTypeConverter(col.Type)
}
}
type typeConverter struct {
typeName string
parsedType []string // e.g. array, array, varchar, for [][]string
}
func newTypeConverter(typeName string) *typeConverter {
return &typeConverter{
typeName: typeName,
parsedType: parseType(typeName),
}
}
// parses Trino types, e.g. array(varchar(10)) to "array", "varchar"
// TODO: Use queryColumn.TypeSignature instead.
func parseType(name string) []string {
parts := strings.Split(strings.ToLower(name), "(")
if len(parts) == 1 {
return parts
}
last := len(parts) - 1
parts[last] = strings.TrimRight(parts[last], ")")
if len(parts[last]) > 0 {
if _, err := strconv.Atoi(parts[last]); err == nil {
parts = parts[:last]
}
}
return parts
}
// ConvertValue implements the driver.ValueConverter interface.
func (c *typeConverter) ConvertValue(v interface{}) (driver.Value, error) {
switch c.parsedType[0] {
case "boolean":
vv, err := scanNullBool(v)
if !vv.Valid {
return nil, err
}
return vv.Bool, err
case "json", "char", "varchar", "uuid", "varbinary", "interval year to month", "interval day to second", "decimal", "ipaddress", "unknown":
vv, err := scanNullString(v)
if !vv.Valid {
return nil, err
}
return vv.String, err
case "tinyint", "smallint", "integer", "bigint":
vv, err := scanNullInt64(v)
if !vv.Valid {
return nil, err
}
return vv.Int64, err
case "real", "double":
vv, err := scanNullFloat64(v)
if !vv.Valid {
return nil, err
}
return vv.Float64, err
case "date", "time", "time with time zone", "timestamp", "timestamp with time zone":
vv, err := scanNullTime(v)
if !vv.Valid {
return nil, err
}
return vv.Time, err
case "map":
if err := validateMap(v); err != nil {
return nil, err
}
return v, nil
case "array":
if err := validateSlice(v); err != nil {
return nil, err
}
return v, nil
default:
return nil, fmt.Errorf("type not supported: %q", c.typeName)
}
}
func validateMap(v interface{}) error {
if v == nil {
return nil
}
if _, ok := v.(map[string]interface{}); !ok {
return fmt.Errorf("cannot convert %v (%T) to map", v, v)
}
return nil
}
func validateSlice(v interface{}) error {
if v == nil {
return nil
}
if _, ok := v.([]interface{}); !ok {
return fmt.Errorf("cannot convert %v (%T) to slice", v, v)
}
return nil
}
func scanNullBool(v interface{}) (sql.NullBool, error) {
if v == nil {
return sql.NullBool{}, nil
}
vv, ok := v.(bool)
if !ok {
return sql.NullBool{},
fmt.Errorf("cannot convert %v (%T) to bool", v, v)
}
return sql.NullBool{Valid: true, Bool: vv}, nil
}
// NullSliceBool represents a slice of bool that may be null.
type NullSliceBool struct {
SliceBool []sql.NullBool
Valid bool
}
// Scan implements the sql.Scanner interface.
func (s *NullSliceBool) Scan(value interface{}) error {
if value == nil {
return nil
}
vs, ok := value.([]interface{})
if !ok {
return fmt.Errorf("trino: cannot convert %v (%T) to []bool", value, value)
}
slice := make([]sql.NullBool, len(vs))
for i := range vs {
v, err := scanNullBool(vs[i])
if err != nil {
return err
}
slice[i] = v
}
s.SliceBool = slice
s.Valid = true
return nil
}
// NullSlice2Bool represents a two-dimensional slice of bool that may be null.
type NullSlice2Bool struct {
Slice2Bool [][]sql.NullBool
Valid bool
}
// Scan implements the sql.Scanner interface.
func (s *NullSlice2Bool) Scan(value interface{}) error {
if value == nil {
return nil
}
vs, ok := value.([]interface{})
if !ok {
return fmt.Errorf("trino: cannot convert %v (%T) to [][]bool", value, value)
}
slice := make([][]sql.NullBool, len(vs))
for i := range vs {
var ss NullSliceBool
if err := ss.Scan(vs[i]); err != nil {
return err
}
slice[i] = ss.SliceBool
}
s.Slice2Bool = slice
s.Valid = true
return nil
}
// NullSlice3Bool implements a three-dimensional slice of bool that may be null.
type NullSlice3Bool struct {
Slice3Bool [][][]sql.NullBool
Valid bool
}
// Scan implements the sql.Scanner interface.
func (s *NullSlice3Bool) Scan(value interface{}) error {
if value == nil {
return nil
}
vs, ok := value.([]interface{})
if !ok {
return fmt.Errorf("trino: cannot convert %v (%T) to [][][]bool", value, value)
}
slice := make([][][]sql.NullBool, len(vs))
for i := range vs {
var ss NullSlice2Bool
if err := ss.Scan(vs[i]); err != nil {
return err
}
slice[i] = ss.Slice2Bool
}
s.Slice3Bool = slice
s.Valid = true
return nil
}
func scanNullString(v interface{}) (sql.NullString, error) {
if v == nil {
return sql.NullString{}, nil
}
vv, ok := v.(string)
if !ok {
return sql.NullString{},
fmt.Errorf("cannot convert %v (%T) to string", v, v)
}
return sql.NullString{Valid: true, String: vv}, nil
}
// NullSliceString represents a slice of string that may be null.
type NullSliceString struct {
SliceString []sql.NullString
Valid bool
}
// Scan implements the sql.Scanner interface.
func (s *NullSliceString) Scan(value interface{}) error {
if value == nil {
return nil
}
vs, ok := value.([]interface{})
if !ok {
return fmt.Errorf("trino: cannot convert %v (%T) to []string", value, value)
}
slice := make([]sql.NullString, len(vs))
for i := range vs {
v, err := scanNullString(vs[i])
if err != nil {
return err
}
slice[i] = v
}
s.SliceString = slice
s.Valid = true
return nil
}
// NullSlice2String represents a two-dimensional slice of string that may be null.
type NullSlice2String struct {
Slice2String [][]sql.NullString
Valid bool
}
// Scan implements the sql.Scanner interface.
func (s *NullSlice2String) Scan(value interface{}) error {
if value == nil {
return nil
}
vs, ok := value.([]interface{})
if !ok {
return fmt.Errorf("trino: cannot convert %v (%T) to [][]string", value, value)
}
slice := make([][]sql.NullString, len(vs))
for i := range vs {
var ss NullSliceString
if err := ss.Scan(vs[i]); err != nil {
return err
}
slice[i] = ss.SliceString
}
s.Slice2String = slice
s.Valid = true
return nil
}
// NullSlice3String implements a three-dimensional slice of string that may be null.
type NullSlice3String struct {
Slice3String [][][]sql.NullString
Valid bool
}
// Scan implements the sql.Scanner interface.
func (s *NullSlice3String) Scan(value interface{}) error {
if value == nil {
return nil
}
vs, ok := value.([]interface{})
if !ok {
return fmt.Errorf("trino: cannot convert %v (%T) to [][][]string", value, value)
}
slice := make([][][]sql.NullString, len(vs))
for i := range vs {
var ss NullSlice2String
if err := ss.Scan(vs[i]); err != nil {
return err
}
slice[i] = ss.Slice2String
}
s.Slice3String = slice
s.Valid = true
return nil
}
func scanNullInt64(v interface{}) (sql.NullInt64, error) {
if v == nil {
return sql.NullInt64{}, nil
}
vNumber, ok := v.(json.Number)
if !ok {
return sql.NullInt64{},
fmt.Errorf("cannot convert %v (%T) to int64", v, v)
}
vv, err := vNumber.Int64()
if err != nil {
return sql.NullInt64{},
fmt.Errorf("cannot convert %v (%T) to int64", v, v)
}
return sql.NullInt64{Valid: true, Int64: vv}, nil
}
// NullSliceInt64 represents a slice of int64 that may be null.
type NullSliceInt64 struct {
SliceInt64 []sql.NullInt64
Valid bool
}
// Scan implements the sql.Scanner interface.
func (s *NullSliceInt64) Scan(value interface{}) error {
if value == nil {
return nil
}
vs, ok := value.([]interface{})
if !ok {
return fmt.Errorf("trino: cannot convert %v (%T) to []int64", value, value)
}
slice := make([]sql.NullInt64, len(vs))
for i := range vs {
v, err := scanNullInt64(vs[i])
if err != nil {
return err
}
slice[i] = v
}
s.SliceInt64 = slice
s.Valid = true
return nil
}
// NullSlice2Int64 represents a two-dimensional slice of int64 that may be null.
type NullSlice2Int64 struct {
Slice2Int64 [][]sql.NullInt64
Valid bool
}
// Scan implements the sql.Scanner interface.
func (s *NullSlice2Int64) Scan(value interface{}) error {
if value == nil {
return nil
}
vs, ok := value.([]interface{})
if !ok {
return fmt.Errorf("trino: cannot convert %v (%T) to [][]int64", value, value)
}
slice := make([][]sql.NullInt64, len(vs))
for i := range vs {
var ss NullSliceInt64
if err := ss.Scan(vs[i]); err != nil {
return err
}
slice[i] = ss.SliceInt64
}
s.Slice2Int64 = slice
s.Valid = true
return nil
}
// NullSlice3Int64 implements a three-dimensional slice of int64 that may be null.
type NullSlice3Int64 struct {
Slice3Int64 [][][]sql.NullInt64
Valid bool
}
// Scan implements the sql.Scanner interface.
func (s *NullSlice3Int64) Scan(value interface{}) error {
if value == nil {
return nil
}
vs, ok := value.([]interface{})
if !ok {
return fmt.Errorf("trino: cannot convert %v (%T) to [][][]int64", value, value)
}
slice := make([][][]sql.NullInt64, len(vs))
for i := range vs {
var ss NullSlice2Int64
if err := ss.Scan(vs[i]); err != nil {
return err
}
slice[i] = ss.Slice2Int64
}
s.Slice3Int64 = slice
s.Valid = true
return nil
}
func scanNullFloat64(v interface{}) (sql.NullFloat64, error) {
if v == nil {
return sql.NullFloat64{}, nil
}
vNumber, ok := v.(json.Number)
if ok {
vFloat, err := vNumber.Float64()
if err != nil {
return sql.NullFloat64{}, fmt.Errorf("cannot convert %v (%T) to float64", vNumber, vNumber)
}
return sql.NullFloat64{Valid: true, Float64: vFloat}, nil
}
switch v {
case "NaN":
return sql.NullFloat64{Valid: true, Float64: math.NaN()}, nil
case "Infinity":
return sql.NullFloat64{Valid: true, Float64: math.Inf(+1)}, nil
case "-Infinity":
return sql.NullFloat64{Valid: true, Float64: math.Inf(-1)}, nil
default:
return sql.NullFloat64{}, fmt.Errorf("cannot convert %v (%T) to float64", v, v)
}
}
// NullSliceFloat64 represents a slice of float64 that may be null.
type NullSliceFloat64 struct {
SliceFloat64 []sql.NullFloat64
Valid bool
}
// Scan implements the sql.Scanner interface.
func (s *NullSliceFloat64) Scan(value interface{}) error {
if value == nil {
return nil
}
vs, ok := value.([]interface{})
if !ok {
return fmt.Errorf("trino: cannot convert %v (%T) to []float64", value, value)
}
slice := make([]sql.NullFloat64, len(vs))
for i := range vs {
v, err := scanNullFloat64(vs[i])
if err != nil {
return err
}
slice[i] = v
}
s.SliceFloat64 = slice
s.Valid = true
return nil
}
// NullSlice2Float64 represents a two-dimensional slice of float64 that may be null.
type NullSlice2Float64 struct {
Slice2Float64 [][]sql.NullFloat64
Valid bool
}
// Scan implements the sql.Scanner interface.
func (s *NullSlice2Float64) Scan(value interface{}) error {
if value == nil {
return nil
}
vs, ok := value.([]interface{})
if !ok {
return fmt.Errorf("trino: cannot convert %v (%T) to [][]float64", value, value)
}
slice := make([][]sql.NullFloat64, len(vs))
for i := range vs {
var ss NullSliceFloat64
if err := ss.Scan(vs[i]); err != nil {
return err
}
slice[i] = ss.SliceFloat64
}
s.Slice2Float64 = slice
s.Valid = true
return nil
}
// NullSlice3Float64 represents a three-dimensional slice of float64 that may be null.
type NullSlice3Float64 struct {
Slice3Float64 [][][]sql.NullFloat64
Valid bool
}
// Scan implements the sql.Scanner interface.
func (s *NullSlice3Float64) Scan(value interface{}) error {
if value == nil {
return nil
}
vs, ok := value.([]interface{})
if !ok {
return fmt.Errorf("trino: cannot convert %v (%T) to [][][]float64", value, value)
}
slice := make([][][]sql.NullFloat64, len(vs))
for i := range vs {
var ss NullSlice2Float64
if err := ss.Scan(vs[i]); err != nil {
return err
}
slice[i] = ss.Slice2Float64
}
s.Slice3Float64 = slice
s.Valid = true
return nil
}
var timeLayouts = []string{
"2006-01-02",
"15:04:05.000",
"2006-01-02 15:04:05.000",
}
func scanNullTime(v interface{}) (NullTime, error) {
if v == nil {
return NullTime{}, nil
}
vv, ok := v.(string)
if !ok {
return NullTime{}, fmt.Errorf("cannot convert %v (%T) to time string", v, v)
}
vparts := strings.Split(vv, " ")
if len(vparts) > 1 && !unicode.IsDigit(rune(vparts[len(vparts)-1][0])) {
return parseNullTimeWithLocation(vv)
}
return parseNullTime(vv)
}
func parseNullTime(v string) (NullTime, error) {
var t time.Time
var err error
for _, layout := range timeLayouts {
t, err = time.ParseInLocation(layout, v, time.Local)
if err == nil {
return NullTime{Valid: true, Time: t}, nil
}
}
return NullTime{}, err
}
func parseNullTimeWithLocation(v string) (NullTime, error) {
idx := strings.LastIndex(v, " ")
if idx == -1 {
return NullTime{}, fmt.Errorf("cannot convert %v (%T) to time+zone", v, v)
}
stamp, location := v[:idx], v[idx+1:]
loc, err := time.LoadLocation(location)
if err != nil {
return NullTime{}, fmt.Errorf("cannot load timezone %q: %v", location, err)
}
var t time.Time
for _, layout := range timeLayouts {
t, err = time.ParseInLocation(layout, stamp, loc)
if err == nil {
return NullTime{Valid: true, Time: t}, nil
}
}
return NullTime{}, err
}
// NullTime represents a time.Time value that can be null.
// The NullTime supports Trino's Date, Time and Timestamp data types,
// with or without time zone.
type NullTime struct {
Time time.Time
Valid bool
}
// Scan implements the sql.Scanner interface.
func (s *NullTime) Scan(value interface{}) error {
switch t := value.(type) {
case time.Time:
s.Time, s.Valid = t, true
case NullTime:
*s = t
}
return nil
}
// NullSliceTime represents a slice of time.Time that may be null.
type NullSliceTime struct {
SliceTime []NullTime
Valid bool
}
// Scan implements the sql.Scanner interface.
func (s *NullSliceTime) Scan(value interface{}) error {
if value == nil {
return nil
}
vs, ok := value.([]interface{})
if !ok {
return fmt.Errorf("trino: cannot convert %v (%T) to []time.Time", value, value)
}
slice := make([]NullTime, len(vs))
for i := range vs {
v, err := scanNullTime(vs[i])
if err != nil {
return err
}
slice[i] = v
}
s.SliceTime = slice
s.Valid = true
return nil
}
// NullSlice2Time represents a two-dimensional slice of time.Time that may be null.
type NullSlice2Time struct {
Slice2Time [][]NullTime
Valid bool
}
// Scan implements the sql.Scanner interface.
func (s *NullSlice2Time) Scan(value interface{}) error {
if value == nil {
return nil
}
vs, ok := value.([]interface{})
if !ok {
return fmt.Errorf("trino: cannot convert %v (%T) to [][]time.Time", value, value)
}
slice := make([][]NullTime, len(vs))
for i := range vs {
var ss NullSliceTime
if err := ss.Scan(vs[i]); err != nil {
return err
}
slice[i] = ss.SliceTime
}
s.Slice2Time = slice
s.Valid = true
return nil
}
// NullSlice3Time represents a three-dimensional slice of time.Time that may be null.
type NullSlice3Time struct {
Slice3Time [][][]NullTime
Valid bool
}
// Scan implements the sql.Scanner interface.
func (s *NullSlice3Time) Scan(value interface{}) error {
if value == nil {
return nil
}
vs, ok := value.([]interface{})
if !ok {
return fmt.Errorf("trino: cannot convert %v (%T) to [][][]time.Time", value, value)
}
slice := make([][][]NullTime, len(vs))
for i := range vs {
var ss NullSlice2Time
if err := ss.Scan(vs[i]); err != nil {
return err
}
slice[i] = ss.Slice2Time
}
s.Slice3Time = slice
s.Valid = true
return nil
}
// NullMap represents a map type that may be null.
type NullMap struct {
Map map[string]interface{}
Valid bool
}
// Scan implements the sql.Scanner interface.
func (m *NullMap) Scan(v interface{}) error {
if v == nil {
return nil
}
m.Map, m.Valid = v.(map[string]interface{})
return nil
}
// NullSliceMap represents a slice of NullMap that may be null.
type NullSliceMap struct {
SliceMap []NullMap
Valid bool
}
// Scan implements the sql.Scanner interface.
func (s *NullSliceMap) Scan(value interface{}) error {
if value == nil {
return nil
}
vs, ok := value.([]interface{})
if !ok {
return fmt.Errorf("trino: cannot convert %v (%T) to []NullMap", value, value)
}
slice := make([]NullMap, len(vs))
for i := range vs {
if err := validateMap(vs[i]); err != nil {
return fmt.Errorf("cannot convert %v (%T) to []NullMap", value, value)
}
m := NullMap{}
// this scan can never fail
_ = m.Scan(vs[i])
slice[i] = m
}
s.SliceMap = slice
s.Valid = true
return nil
}
// NullSlice2Map represents a two-dimensional slice of NullMap that may be null.
type NullSlice2Map struct {
Slice2Map [][]NullMap
Valid bool
}
// Scan implements the sql.Scanner interface.
func (s *NullSlice2Map) Scan(value interface{}) error {
if value == nil {
return nil
}
vs, ok := value.([]interface{})
if !ok {
return fmt.Errorf("trino: cannot convert %v (%T) to [][]NullMap", value, value)
}
slice := make([][]NullMap, len(vs))
for i := range vs {
var ss NullSliceMap
if err := ss.Scan(vs[i]); err != nil {
return err
}
slice[i] = ss.SliceMap
}
s.Slice2Map = slice
s.Valid = true
return nil
}
// NullSlice3Map represents a three-dimensional slice of NullMap that may be null.
type NullSlice3Map struct {
Slice3Map [][][]NullMap
Valid bool
}
// Scan implements the sql.Scanner interface.
func (s *NullSlice3Map) Scan(value interface{}) error {
if value == nil {
return nil
}
vs, ok := value.([]interface{})
if !ok {
return fmt.Errorf("trino: cannot convert %v (%T) to [][][]NullMap", value, value)
}
slice := make([][][]NullMap, len(vs))
for i := range vs {
var ss NullSlice2Map
if err := ss.Scan(vs[i]); err != nil {
return err
}
slice[i] = ss.Slice2Map
}
s.Slice3Map = slice
s.Valid = true
return nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/github_repo/trino-golang-client.git
git@gitee.com:github_repo/trino-golang-client.git
github_repo
trino-golang-client
trino-golang-client
main

搜索帮助