当前仓库属于暂停状态,部分功能使用受限,详情请查阅 仓库状态说明
15 Star 22 Fork 17

openGauss/openGauss-connector-go-pq
暂停

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
notify_test.go 13.08 KB
一键复制 编辑 原始数据 按行查看 历史
travelliu 提交于 2022-02-11 15:40 +08:00 . feat: add custom tls
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
package pq
import (
"database/sql"
"database/sql/driver"
"errors"
"fmt"
"runtime"
"sync"
"testing"
"time"
)
var errNilNotification = errors.New("nil notification")
func expectNotification(t *testing.T, ch <-chan *Notification, relname string, extra string) error {
select {
case n := <-ch:
if n == nil {
return errNilNotification
}
if n.Channel != relname || n.Extra != extra {
return fmt.Errorf("unexpected notification %v", n)
}
return nil
case <-time.After(1500 * time.Millisecond):
return fmt.Errorf("timeout")
}
}
func expectNoNotification(t *testing.T, ch <-chan *Notification) error {
select {
case n := <-ch:
return fmt.Errorf("unexpected notification %v", n)
case <-time.After(100 * time.Millisecond):
return nil
}
}
func expectEvent(t *testing.T, eventch <-chan ListenerEventType, et ListenerEventType) error {
select {
case e := <-eventch:
if e != et {
return fmt.Errorf("unexpected event %v", e)
}
return nil
case <-time.After(1500 * time.Millisecond):
panic("expectEvent timeout")
}
}
func expectNoEvent(t *testing.T, eventch <-chan ListenerEventType) error {
select {
case e := <-eventch:
return fmt.Errorf("unexpected event %v", e)
case <-time.After(100 * time.Millisecond):
return nil
}
}
func newTestListenerConn(t *testing.T) (*ListenerConn, <-chan *Notification) {
name, err := getTestDsn()
if err != nil {
t.Fatal(err)
}
notificationChan := make(chan *Notification)
l, err := NewListenerConn(name, notificationChan)
if err != nil {
t.Fatal(err)
}
return l, notificationChan
}
func TestNewListenerConn(t *testing.T) {
l, _ := newTestListenerConn(t)
defer l.Close()
}
// func TestConnListen(t *testing.T) {
// l, channel := newTestListenerConn(t)
//
// defer l.Close()
//
// db := openTestConn(t)
// defer db.Close()
//
// ok, err := l.Listen("notify_test")
// if !ok || err != nil {
// t.Fatal(err)
// }
//
// _, err = db.Exec("NOTIFY notify_test")
// if err != nil {
// t.Fatal(err)
// }
//
// err = expectNotification(t, channel, "notify_test", "")
// if err != nil {
// t.Fatal(err)
// }
// }
// func TestConnUnListen(t *testing.T) {
// l, channel := newTestListenerConn(t)
//
// defer l.Close()
//
// db := openTestConn(t)
// defer db.Close()
//
// ok, err := l.Listen("notify_test")
// if !ok || err != nil {
// t.Fatal(err)
// }
//
// _, err = db.Exec("NOTIFY notify_test")
// if err != nil {
// t.Fatal(err)
// }
//
// err = expectNotification(t, channel, "notify_test", "")
// if err != nil {
// t.Fatal(err)
// }
//
// ok, err = l.Unlisten("notify_test")
// if !ok || err != nil {
// t.Fatal(err)
// }
//
// _, err = db.Exec("NOTIFY notify_test")
// if err != nil {
// t.Fatal(err)
// }
//
// err = expectNoNotification(t, channel)
// if err != nil {
// t.Fatal(err)
// }
// }
// func TestConnUnListenAll(t *testing.T) {
// l, channel := newTestListenerConn(t)
//
// defer l.Close()
//
// db := openTestConn(t)
// defer db.Close()
//
// ok, err := l.Listen("notify_test")
// if !ok || err != nil {
// t.Fatal(err)
// }
//
// _, err = db.Exec("NOTIFY notify_test")
// if err != nil {
// t.Fatal(err)
// }
//
// err = expectNotification(t, channel, "notify_test", "")
// if err != nil {
// t.Fatal(err)
// }
//
// ok, err = l.UnlistenAll()
// if !ok || err != nil {
// t.Fatal(err)
// }
//
// _, err = db.Exec("NOTIFY notify_test")
// if err != nil {
// t.Fatal(err)
// }
//
// err = expectNoNotification(t, channel)
// if err != nil {
// t.Fatal(err)
// }
// }
func TestConnClose(t *testing.T) {
l, _ := newTestListenerConn(t)
defer l.Close()
err := l.Close()
if err != nil {
t.Fatal(err)
}
err = l.Close()
if err != errListenerConnClosed {
t.Fatalf("expected errListenerConnClosed; got %v", err)
}
}
func TestConnPing(t *testing.T) {
l, _ := newTestListenerConn(t)
defer l.Close()
err := l.Ping()
if err != nil {
t.Fatal(err)
}
err = l.Close()
if err != nil {
t.Fatal(err)
}
err = l.Ping()
if err != errListenerConnClosed {
t.Fatalf("expected errListenerConnClosed; got %v", err)
}
}
// Test for deadlock where a query fails while another one is queued
func TestConnExecDeadlock(t *testing.T) {
l, _ := newTestListenerConn(t)
defer l.Close()
var wg sync.WaitGroup
wg.Add(2)
go func() {
l.ExecSimpleQuery("SELECT pg_sleep(60)")
wg.Done()
}()
runtime.Gosched()
go func() {
l.ExecSimpleQuery("SELECT 1")
wg.Done()
}()
// give the two goroutines some time to get into position
runtime.Gosched()
// calls Close on the net.Conn; equivalent to a network failure
l.Close()
defer time.AfterFunc(10*time.Second, func() {
panic("timed out")
}).Stop()
wg.Wait()
}
// Test for ListenerConn being closed while a slow query is executing
func TestListenerConnCloseWhileQueryIsExecuting(t *testing.T) {
l, _ := newTestListenerConn(t)
defer l.Close()
var wg sync.WaitGroup
wg.Add(1)
go func() {
sent, err := l.ExecSimpleQuery("SELECT pg_sleep(60)")
if sent {
panic("expected sent=false")
}
// could be any of a number of errors
if err == nil {
panic("expected error")
}
wg.Done()
}()
// give the above goroutine some time to get into position
runtime.Gosched()
err := l.Close()
if err != nil {
t.Fatal(err)
}
defer time.AfterFunc(10*time.Second, func() {
panic("timed out")
}).Stop()
wg.Wait()
}
// func TestNotifyExtra(t *testing.T) {
// db := openTestConn(t)
// defer db.Close()
//
// if getServerVersion(t, db) < 90000 {
// t.Skip("skipping NOTIFY payload test since the server does not appear to support it")
// }
//
// l, channel := newTestListenerConn(t)
// defer l.Close()
//
// ok, err := l.Listen("notify_test")
// if !ok || err != nil {
// t.Fatal(err)
// }
//
// _, err = db.Exec("NOTIFY notify_test, 'something'")
// if err != nil {
// t.Fatal(err)
// }
//
// err = expectNotification(t, channel, "notify_test", "something")
// if err != nil {
// t.Fatal(err)
// }
// }
// create a new test listener and also set the timeouts
func newTestListenerTimeout(t *testing.T, min time.Duration, max time.Duration) (*Listener, <-chan ListenerEventType) {
// datname := os.Getenv("PGDATABASE")
// sslmode := os.Getenv("PGSSLMODE")
//
// if datname == "" {
// os.Setenv("PGDATABASE", "pqgotest")
// }
//
// if sslmode == "" {
// os.Setenv("PGSSLMODE", "disable")
// }
name, err := getTestDsn()
if err != nil {
t.Fatal(err)
}
eventch := make(chan ListenerEventType, 16)
l := NewListener(name, min, max, func(t ListenerEventType, err error) { eventch <- t })
err = expectEvent(t, eventch, ListenerEventConnected)
if err != nil {
t.Fatal(err)
}
return l, eventch
}
func newTestListener(t *testing.T) (*Listener, <-chan ListenerEventType) {
return newTestListenerTimeout(t, time.Hour, time.Hour)
}
// func TestListenerListen(t *testing.T) {
// l, _ := newTestListener(t)
// defer l.Close()
//
// db := openTestConn(t)
// defer db.Close()
//
// err := l.Listen("notify_listen_test")
// if err != nil {
// t.Fatal(err)
// }
//
// _, err = db.Exec("NOTIFY notify_listen_test")
// if err != nil {
// t.Fatal(err)
// }
//
// err = expectNotification(t, l.Notify, "notify_listen_test", "")
// if err != nil {
// t.Fatal(err)
// }
// }
// func TestListenerUnlisten(t *testing.T) {
// l, _ := newTestListener(t)
// defer l.Close()
//
// db := openTestConn(t)
// defer db.Close()
//
// err := l.Listen("notify_listen_test")
// if err != nil {
// t.Fatal(err)
// }
//
// _, err = db.Exec("NOTIFY notify_listen_test")
// if err != nil {
// t.Fatal(err)
// }
//
// err = l.Unlisten("notify_listen_test")
// if err != nil {
// t.Fatal(err)
// }
//
// err = expectNotification(t, l.Notify, "notify_listen_test", "")
// if err != nil {
// t.Fatal(err)
// }
//
// _, err = db.Exec("NOTIFY notify_listen_test")
// if err != nil {
// t.Fatal(err)
// }
//
// err = expectNoNotification(t, l.Notify)
// if err != nil {
// t.Fatal(err)
// }
// }
// func TestListenerUnListenAll(t *testing.T) {
// l, _ := newTestListener(t)
// defer l.Close()
//
// db := openTestConn(t)
// defer db.Close()
//
// err := l.Listen("notify_listen_test")
// if err != nil {
// t.Fatal(err)
// }
//
// _, err = db.Exec("NOTIFY notify_listen_test")
// if err != nil {
// t.Fatal(err)
// }
//
// err = l.UnlistenAll()
// if err != nil {
// t.Fatal(err)
// }
//
// err = expectNotification(t, l.Notify, "notify_listen_test", "")
// if err != nil {
// t.Fatal(err)
// }
//
// _, err = db.Exec("NOTIFY notify_listen_test")
// if err != nil {
// t.Fatal(err)
// }
//
// err = expectNoNotification(t, l.Notify)
// if err != nil {
// t.Fatal(err)
// }
// }
// func TestListenerFailedQuery(t *testing.T) {
// l, eventch := newTestListener(t)
// defer l.Close()
//
// db := openTestConn(t)
// defer db.Close()
//
// err := l.Listen("notify_listen_test")
// if err != nil {
// t.Fatal(err)
// }
//
// _, err = db.Exec("NOTIFY notify_listen_test")
// if err != nil {
// t.Fatal(err)
// }
//
// err = expectNotification(t, l.Notify, "notify_listen_test", "")
// if err != nil {
// t.Fatal(err)
// }
//
// // shouldn't cause a disconnect
// ok, err := l.cn.ExecSimpleQuery("SELECT error")
// if !ok {
// t.Fatalf("could not send query to server: %v", err)
// }
// _, ok = err.(PGError)
// if !ok {
// t.Fatalf("unexpected error %v", err)
// }
// err = expectNoEvent(t, eventch)
// if err != nil {
// t.Fatal(err)
// }
//
// // should still work
// _, err = db.Exec("NOTIFY notify_listen_test")
// if err != nil {
// t.Fatal(err)
// }
//
// err = expectNotification(t, l.Notify, "notify_listen_test", "")
// if err != nil {
// t.Fatal(err)
// }
// }
// func TestListenerReconnect(t *testing.T) {
// l, eventch := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour)
// defer l.Close()
//
// db := openTestConn(t)
// defer db.Close()
//
// err := l.Listen("notify_listen_test")
// if err != nil {
// t.Fatal(err)
// }
//
// _, err = db.Exec("NOTIFY notify_listen_test")
// if err != nil {
// t.Fatal(err)
// }
//
// err = expectNotification(t, l.Notify, "notify_listen_test", "")
// if err != nil {
// t.Fatal(err)
// }
//
// // kill the connection and make sure it comes back up
// ok, err := l.cn.ExecSimpleQuery("SELECT pg_terminate_backend(pg_backend_pid())")
// if ok {
// t.Fatalf("could not kill the connection: %v", err)
// }
// if err != io.EOF {
// t.Fatalf("unexpected error %v", err)
// }
// err = expectEvent(t, eventch, ListenerEventDisconnected)
// if err != nil {
// t.Fatal(err)
// }
// err = expectEvent(t, eventch, ListenerEventReconnected)
// if err != nil {
// t.Fatal(err)
// }
//
// // should still work
// _, err = db.Exec("NOTIFY notify_listen_test")
// if err != nil {
// t.Fatal(err)
// }
//
// // should get nil after Reconnected
// err = expectNotification(t, l.Notify, "", "")
// if err != errNilNotification {
// t.Fatal(err)
// }
//
// err = expectNotification(t, l.Notify, "notify_listen_test", "")
// if err != nil {
// t.Fatal(err)
// }
// }
func TestListenerClose(t *testing.T) {
l, _ := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour)
defer l.Close()
err := l.Close()
if err != nil {
t.Fatal(err)
}
err = l.Close()
if err != errListenerClosed {
t.Fatalf("expected errListenerClosed; got %v", err)
}
}
func TestListenerPing(t *testing.T) {
l, _ := newTestListenerTimeout(t, 20*time.Millisecond, time.Hour)
defer l.Close()
err := l.Ping()
if err != nil {
t.Fatal(err)
}
err = l.Close()
if err != nil {
t.Fatal(err)
}
err = l.Ping()
if err != errListenerClosed {
t.Fatalf("expected errListenerClosed; got %v", err)
}
}
// func TestConnectorWithNotificationHandler_Simple(t *testing.T) {
// b, err := NewConnector("")
// if err != nil {
// t.Fatal(err)
// }
// var notification *Notification
// // Make connector w/ handler to set the local var
// c := ConnectorWithNotificationHandler(b, func(n *Notification) { notification = n })
// sendNotification(c, t, "Test notification #1")
// if notification == nil || notification.Extra != "Test notification #1" {
// t.Fatalf("Expected notification w/ message, got %v", notification)
// }
// // Unset the handler on the same connector
// prevC := c
// if c = ConnectorWithNotificationHandler(c, nil); c != prevC {
// t.Fatalf("Expected to not create new connector but did")
// }
// sendNotification(c, t, "Test notification #2")
// if notification == nil || notification.Extra != "Test notification #1" {
// t.Fatalf("Expected notification to not change, got %v", notification)
// }
// // Set it back on the same connector
// if c = ConnectorWithNotificationHandler(c, func(n *Notification) { notification = n }); c != prevC {
// t.Fatal("Expected to not create new connector but did")
// }
// sendNotification(c, t, "Test notification #3")
// if notification == nil || notification.Extra != "Test notification #3" {
// t.Fatalf("Expected notification w/ message, got %v", notification)
// }
// }
func sendNotification(c driver.Connector, t *testing.T, escapedNotification string) {
db := sql.OpenDB(c)
defer db.Close()
sql := fmt.Sprintf("LISTEN foo; NOTIFY foo, '%s';", escapedNotification)
if _, err := db.Exec(sql); err != nil {
t.Fatal(err)
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/opengauss/openGauss-connector-go-pq.git
git@gitee.com:opengauss/openGauss-connector-go-pq.git
opengauss
openGauss-connector-go-pq
openGauss-connector-go-pq
master

搜索帮助