1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package sql
17
18 import (
19 "context"
20 "database/sql/driver"
21 "errors"
22 "fmt"
23 "io"
24 "reflect"
25 "runtime"
26 "sort"
27 "strconv"
28 "sync"
29 "sync/atomic"
30 "time"
31 )
32
33 var (
34 driversMu sync.RWMutex
35 drivers = make(map[string]driver.Driver)
36 )
37
38
39 var nowFunc = time.Now
40
41
42
43
44 func Register(name string, driver driver.Driver) {
45 driversMu.Lock()
46 defer driversMu.Unlock()
47 if driver == nil {
48 panic("sql: Register driver is nil")
49 }
50 if _, dup := drivers[name]; dup {
51 panic("sql: Register called twice for driver " + name)
52 }
53 drivers[name] = driver
54 }
55
56 func unregisterAllDrivers() {
57 driversMu.Lock()
58 defer driversMu.Unlock()
59
60 drivers = make(map[string]driver.Driver)
61 }
62
63
64 func Drivers() []string {
65 driversMu.RLock()
66 defer driversMu.RUnlock()
67 list := make([]string, 0, len(drivers))
68 for name := range drivers {
69 list = append(list, name)
70 }
71 sort.Strings(list)
72 return list
73 }
74
75
76
77
78
79
80
81 type NamedArg struct {
82 _NamedFieldsRequired struct{}
83
84
85
86
87
88
89
90 Name string
91
92
93
94
95 Value any
96 }
97
98
99
100
101
102
103
104
105
106
107
108
109
110 func Named(name string, value any) NamedArg {
111
112
113
114
115 return NamedArg{Name: name, Value: value}
116 }
117
118
119 type IsolationLevel int
120
121
122
123
124
125 const (
126 LevelDefault IsolationLevel = iota
127 LevelReadUncommitted
128 LevelReadCommitted
129 LevelWriteCommitted
130 LevelRepeatableRead
131 LevelSnapshot
132 LevelSerializable
133 LevelLinearizable
134 )
135
136
137 func (i IsolationLevel) String() string {
138 switch i {
139 case LevelDefault:
140 return "Default"
141 case LevelReadUncommitted:
142 return "Read Uncommitted"
143 case LevelReadCommitted:
144 return "Read Committed"
145 case LevelWriteCommitted:
146 return "Write Committed"
147 case LevelRepeatableRead:
148 return "Repeatable Read"
149 case LevelSnapshot:
150 return "Snapshot"
151 case LevelSerializable:
152 return "Serializable"
153 case LevelLinearizable:
154 return "Linearizable"
155 default:
156 return "IsolationLevel(" + strconv.Itoa(int(i)) + ")"
157 }
158 }
159
160 var _ fmt.Stringer = LevelDefault
161
162
163 type TxOptions struct {
164
165
166 Isolation IsolationLevel
167 ReadOnly bool
168 }
169
170
171
172
173 type RawBytes []byte
174
175
176
177
178
179
180
181
182
183
184
185
186
187 type NullString struct {
188 String string
189 Valid bool
190 }
191
192
193 func (ns *NullString) Scan(value any) error {
194 if value == nil {
195 ns.String, ns.Valid = "", false
196 return nil
197 }
198 ns.Valid = true
199 return convertAssign(&ns.String, value)
200 }
201
202
203 func (ns NullString) Value() (driver.Value, error) {
204 if !ns.Valid {
205 return nil, nil
206 }
207 return ns.String, nil
208 }
209
210
211
212
213 type NullInt64 struct {
214 Int64 int64
215 Valid bool
216 }
217
218
219 func (n *NullInt64) Scan(value any) error {
220 if value == nil {
221 n.Int64, n.Valid = 0, false
222 return nil
223 }
224 n.Valid = true
225 return convertAssign(&n.Int64, value)
226 }
227
228
229 func (n NullInt64) Value() (driver.Value, error) {
230 if !n.Valid {
231 return nil, nil
232 }
233 return n.Int64, nil
234 }
235
236
237
238
239 type NullInt32 struct {
240 Int32 int32
241 Valid bool
242 }
243
244
245 func (n *NullInt32) Scan(value any) error {
246 if value == nil {
247 n.Int32, n.Valid = 0, false
248 return nil
249 }
250 n.Valid = true
251 return convertAssign(&n.Int32, value)
252 }
253
254
255 func (n NullInt32) Value() (driver.Value, error) {
256 if !n.Valid {
257 return nil, nil
258 }
259 return int64(n.Int32), nil
260 }
261
262
263
264
265 type NullInt16 struct {
266 Int16 int16
267 Valid bool
268 }
269
270
271 func (n *NullInt16) Scan(value any) error {
272 if value == nil {
273 n.Int16, n.Valid = 0, false
274 return nil
275 }
276 err := convertAssign(&n.Int16, value)
277 n.Valid = err == nil
278 return err
279 }
280
281
282 func (n NullInt16) Value() (driver.Value, error) {
283 if !n.Valid {
284 return nil, nil
285 }
286 return int64(n.Int16), nil
287 }
288
289
290
291
292 type NullByte struct {
293 Byte byte
294 Valid bool
295 }
296
297
298 func (n *NullByte) Scan(value any) error {
299 if value == nil {
300 n.Byte, n.Valid = 0, false
301 return nil
302 }
303 err := convertAssign(&n.Byte, value)
304 n.Valid = err == nil
305 return err
306 }
307
308
309 func (n NullByte) Value() (driver.Value, error) {
310 if !n.Valid {
311 return nil, nil
312 }
313 return int64(n.Byte), nil
314 }
315
316
317
318
319 type NullFloat64 struct {
320 Float64 float64
321 Valid bool
322 }
323
324
325 func (n *NullFloat64) Scan(value any) error {
326 if value == nil {
327 n.Float64, n.Valid = 0, false
328 return nil
329 }
330 n.Valid = true
331 return convertAssign(&n.Float64, value)
332 }
333
334
335 func (n NullFloat64) Value() (driver.Value, error) {
336 if !n.Valid {
337 return nil, nil
338 }
339 return n.Float64, nil
340 }
341
342
343
344
345 type NullBool struct {
346 Bool bool
347 Valid bool
348 }
349
350
351 func (n *NullBool) Scan(value any) error {
352 if value == nil {
353 n.Bool, n.Valid = false, false
354 return nil
355 }
356 n.Valid = true
357 return convertAssign(&n.Bool, value)
358 }
359
360
361 func (n NullBool) Value() (driver.Value, error) {
362 if !n.Valid {
363 return nil, nil
364 }
365 return n.Bool, nil
366 }
367
368
369
370
371 type NullTime struct {
372 Time time.Time
373 Valid bool
374 }
375
376
377 func (n *NullTime) Scan(value any) error {
378 if value == nil {
379 n.Time, n.Valid = time.Time{}, false
380 return nil
381 }
382 n.Valid = true
383 return convertAssign(&n.Time, value)
384 }
385
386
387 func (n NullTime) Value() (driver.Value, error) {
388 if !n.Valid {
389 return nil, nil
390 }
391 return n.Time, nil
392 }
393
394
395 type Scanner interface {
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414 Scan(src any) error
415 }
416
417
418
419
420
421
422
423
424
425 type Out struct {
426 _NamedFieldsRequired struct{}
427
428
429
430 Dest any
431
432
433
434
435 In bool
436 }
437
438
439
440
441 var ErrNoRows = errors.New("sql: no rows in result set")
442
443
444
445
446
447
448
449
450
451
452
453
454
455 type DB struct {
456
457
458 waitDuration int64
459
460 connector driver.Connector
461
462
463
464 numClosed uint64
465
466 mu sync.Mutex
467 freeConn []*driverConn
468 connRequests map[uint64]chan connRequest
469 nextRequest uint64
470 numOpen int
471
472
473
474
475
476 openerCh chan struct{}
477 closed bool
478 dep map[finalCloser]depSet
479 lastPut map[*driverConn]string
480 maxIdleCount int
481 maxOpen int
482 maxLifetime time.Duration
483 maxIdleTime time.Duration
484 cleanerCh chan struct{}
485 waitCount int64
486 maxIdleClosed int64
487 maxIdleTimeClosed int64
488 maxLifetimeClosed int64
489
490 stop func()
491 }
492
493
494 type connReuseStrategy uint8
495
496 const (
497
498 alwaysNewConn connReuseStrategy = iota
499
500
501
502 cachedOrNewConn
503 )
504
505
506
507
508
509 type driverConn struct {
510 db *DB
511 createdAt time.Time
512
513 sync.Mutex
514 ci driver.Conn
515 needReset bool
516 closed bool
517 finalClosed bool
518 openStmt map[*driverStmt]bool
519
520
521 inUse bool
522 returnedAt time.Time
523 onPut []func()
524 dbmuClosed bool
525 }
526
527 func (dc *driverConn) releaseConn(err error) {
528 dc.db.putConn(dc, err, true)
529 }
530
531 func (dc *driverConn) removeOpenStmt(ds *driverStmt) {
532 dc.Lock()
533 defer dc.Unlock()
534 delete(dc.openStmt, ds)
535 }
536
537 func (dc *driverConn) expired(timeout time.Duration) bool {
538 if timeout <= 0 {
539 return false
540 }
541 return dc.createdAt.Add(timeout).Before(nowFunc())
542 }
543
544
545
546 func (dc *driverConn) resetSession(ctx context.Context) error {
547 dc.Lock()
548 defer dc.Unlock()
549
550 if !dc.needReset {
551 return nil
552 }
553 if cr, ok := dc.ci.(driver.SessionResetter); ok {
554 return cr.ResetSession(ctx)
555 }
556 return nil
557 }
558
559
560
561 func (dc *driverConn) validateConnection(needsReset bool) bool {
562 dc.Lock()
563 defer dc.Unlock()
564
565 if needsReset {
566 dc.needReset = true
567 }
568 if cv, ok := dc.ci.(driver.Validator); ok {
569 return cv.IsValid()
570 }
571 return true
572 }
573
574
575
576 func (dc *driverConn) prepareLocked(ctx context.Context, cg stmtConnGrabber, query string) (*driverStmt, error) {
577 si, err := ctxDriverPrepare(ctx, dc.ci, query)
578 if err != nil {
579 return nil, err
580 }
581 ds := &driverStmt{Locker: dc, si: si}
582
583
584 if cg != nil {
585 return ds, nil
586 }
587
588
589
590
591
592 if dc.openStmt == nil {
593 dc.openStmt = make(map[*driverStmt]bool)
594 }
595 dc.openStmt[ds] = true
596 return ds, nil
597 }
598
599
600 func (dc *driverConn) closeDBLocked() func() error {
601 dc.Lock()
602 defer dc.Unlock()
603 if dc.closed {
604 return func() error { return errors.New("sql: duplicate driverConn close") }
605 }
606 dc.closed = true
607 return dc.db.removeDepLocked(dc, dc)
608 }
609
610 func (dc *driverConn) Close() error {
611 dc.Lock()
612 if dc.closed {
613 dc.Unlock()
614 return errors.New("sql: duplicate driverConn close")
615 }
616 dc.closed = true
617 dc.Unlock()
618
619
620 dc.db.mu.Lock()
621 dc.dbmuClosed = true
622 fn := dc.db.removeDepLocked(dc, dc)
623 dc.db.mu.Unlock()
624 return fn()
625 }
626
627 func (dc *driverConn) finalClose() error {
628 var err error
629
630
631
632 var openStmt []*driverStmt
633 withLock(dc, func() {
634 openStmt = make([]*driverStmt, 0, len(dc.openStmt))
635 for ds := range dc.openStmt {
636 openStmt = append(openStmt, ds)
637 }
638 dc.openStmt = nil
639 })
640 for _, ds := range openStmt {
641 ds.Close()
642 }
643 withLock(dc, func() {
644 dc.finalClosed = true
645 err = dc.ci.Close()
646 dc.ci = nil
647 })
648
649 dc.db.mu.Lock()
650 dc.db.numOpen--
651 dc.db.maybeOpenNewConnections()
652 dc.db.mu.Unlock()
653
654 atomic.AddUint64(&dc.db.numClosed, 1)
655 return err
656 }
657
658
659
660
661 type driverStmt struct {
662 sync.Locker
663 si driver.Stmt
664 closed bool
665 closeErr error
666 }
667
668
669
670 func (ds *driverStmt) Close() error {
671 ds.Lock()
672 defer ds.Unlock()
673 if ds.closed {
674 return ds.closeErr
675 }
676 ds.closed = true
677 ds.closeErr = ds.si.Close()
678 return ds.closeErr
679 }
680
681
682 type depSet map[any]bool
683
684
685
686 type finalCloser interface {
687
688
689 finalClose() error
690 }
691
692
693
694 func (db *DB) addDep(x finalCloser, dep any) {
695 db.mu.Lock()
696 defer db.mu.Unlock()
697 db.addDepLocked(x, dep)
698 }
699
700 func (db *DB) addDepLocked(x finalCloser, dep any) {
701 if db.dep == nil {
702 db.dep = make(map[finalCloser]depSet)
703 }
704 xdep := db.dep[x]
705 if xdep == nil {
706 xdep = make(depSet)
707 db.dep[x] = xdep
708 }
709 xdep[dep] = true
710 }
711
712
713
714
715
716 func (db *DB) removeDep(x finalCloser, dep any) error {
717 db.mu.Lock()
718 fn := db.removeDepLocked(x, dep)
719 db.mu.Unlock()
720 return fn()
721 }
722
723 func (db *DB) removeDepLocked(x finalCloser, dep any) func() error {
724 xdep, ok := db.dep[x]
725 if !ok {
726 panic(fmt.Sprintf("unpaired removeDep: no deps for %T", x))
727 }
728
729 l0 := len(xdep)
730 delete(xdep, dep)
731
732 switch len(xdep) {
733 case l0:
734
735 panic(fmt.Sprintf("unpaired removeDep: no %T dep on %T", dep, x))
736 case 0:
737
738 delete(db.dep, x)
739 return x.finalClose
740 default:
741
742 return func() error { return nil }
743 }
744 }
745
746
747
748
749
750
751 var connectionRequestQueueSize = 1000000
752
753 type dsnConnector struct {
754 dsn string
755 driver driver.Driver
756 }
757
758 func (t dsnConnector) Connect(_ context.Context) (driver.Conn, error) {
759 return t.driver.Open(t.dsn)
760 }
761
762 func (t dsnConnector) Driver() driver.Driver {
763 return t.driver
764 }
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782 func OpenDB(c driver.Connector) *DB {
783 ctx, cancel := context.WithCancel(context.Background())
784 db := &DB{
785 connector: c,
786 openerCh: make(chan struct{}, connectionRequestQueueSize),
787 lastPut: make(map[*driverConn]string),
788 connRequests: make(map[uint64]chan connRequest),
789 stop: cancel,
790 }
791
792 go db.connectionOpener(ctx)
793
794 return db
795 }
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814 func Open(driverName, dataSourceName string) (*DB, error) {
815 driversMu.RLock()
816 driveri, ok := drivers[driverName]
817 driversMu.RUnlock()
818 if !ok {
819 return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
820 }
821
822 if driverCtx, ok := driveri.(driver.DriverContext); ok {
823 connector, err := driverCtx.OpenConnector(dataSourceName)
824 if err != nil {
825 return nil, err
826 }
827 return OpenDB(connector), nil
828 }
829
830 return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
831 }
832
833 func (db *DB) pingDC(ctx context.Context, dc *driverConn, release func(error)) error {
834 var err error
835 if pinger, ok := dc.ci.(driver.Pinger); ok {
836 withLock(dc, func() {
837 err = pinger.Ping(ctx)
838 })
839 }
840 release(err)
841 return err
842 }
843
844
845
846 func (db *DB) PingContext(ctx context.Context) error {
847 var dc *driverConn
848 var err error
849 var isBadConn bool
850 for i := 0; i < maxBadConnRetries; i++ {
851 dc, err = db.conn(ctx, cachedOrNewConn)
852 isBadConn = errors.Is(err, driver.ErrBadConn)
853 if !isBadConn {
854 break
855 }
856 }
857 if isBadConn {
858 dc, err = db.conn(ctx, alwaysNewConn)
859 }
860 if err != nil {
861 return err
862 }
863
864 return db.pingDC(ctx, dc, dc.releaseConn)
865 }
866
867
868
869
870
871
872 func (db *DB) Ping() error {
873 return db.PingContext(context.Background())
874 }
875
876
877
878
879
880
881
882 func (db *DB) Close() error {
883 db.mu.Lock()
884 if db.closed {
885 db.mu.Unlock()
886 return nil
887 }
888 if db.cleanerCh != nil {
889 close(db.cleanerCh)
890 }
891 var err error
892 fns := make([]func() error, 0, len(db.freeConn))
893 for _, dc := range db.freeConn {
894 fns = append(fns, dc.closeDBLocked())
895 }
896 db.freeConn = nil
897 db.closed = true
898 for _, req := range db.connRequests {
899 close(req)
900 }
901 db.mu.Unlock()
902 for _, fn := range fns {
903 err1 := fn()
904 if err1 != nil {
905 err = err1
906 }
907 }
908 db.stop()
909 if c, ok := db.connector.(io.Closer); ok {
910 err1 := c.Close()
911 if err1 != nil {
912 err = err1
913 }
914 }
915 return err
916 }
917
918 const defaultMaxIdleConns = 2
919
920 func (db *DB) maxIdleConnsLocked() int {
921 n := db.maxIdleCount
922 switch {
923 case n == 0:
924
925 return defaultMaxIdleConns
926 case n < 0:
927 return 0
928 default:
929 return n
930 }
931 }
932
933 func (db *DB) shortestIdleTimeLocked() time.Duration {
934 if db.maxIdleTime <= 0 {
935 return db.maxLifetime
936 }
937 if db.maxLifetime <= 0 {
938 return db.maxIdleTime
939 }
940
941 min := db.maxIdleTime
942 if min > db.maxLifetime {
943 min = db.maxLifetime
944 }
945 return min
946 }
947
948
949
950
951
952
953
954
955
956
957
958 func (db *DB) SetMaxIdleConns(n int) {
959 db.mu.Lock()
960 if n > 0 {
961 db.maxIdleCount = n
962 } else {
963
964 db.maxIdleCount = -1
965 }
966
967 if db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen {
968 db.maxIdleCount = db.maxOpen
969 }
970 var closing []*driverConn
971 idleCount := len(db.freeConn)
972 maxIdle := db.maxIdleConnsLocked()
973 if idleCount > maxIdle {
974 closing = db.freeConn[maxIdle:]
975 db.freeConn = db.freeConn[:maxIdle]
976 }
977 db.maxIdleClosed += int64(len(closing))
978 db.mu.Unlock()
979 for _, c := range closing {
980 c.Close()
981 }
982 }
983
984
985
986
987
988
989
990
991
992 func (db *DB) SetMaxOpenConns(n int) {
993 db.mu.Lock()
994 db.maxOpen = n
995 if n < 0 {
996 db.maxOpen = 0
997 }
998 syncMaxIdle := db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen
999 db.mu.Unlock()
1000 if syncMaxIdle {
1001 db.SetMaxIdleConns(n)
1002 }
1003 }
1004
1005
1006
1007
1008
1009
1010 func (db *DB) SetConnMaxLifetime(d time.Duration) {
1011 if d < 0 {
1012 d = 0
1013 }
1014 db.mu.Lock()
1015
1016 if d > 0 && d < db.maxLifetime && db.cleanerCh != nil {
1017 select {
1018 case db.cleanerCh <- struct{}{}:
1019 default:
1020 }
1021 }
1022 db.maxLifetime = d
1023 db.startCleanerLocked()
1024 db.mu.Unlock()
1025 }
1026
1027
1028
1029
1030
1031
1032 func (db *DB) SetConnMaxIdleTime(d time.Duration) {
1033 if d < 0 {
1034 d = 0
1035 }
1036 db.mu.Lock()
1037 defer db.mu.Unlock()
1038
1039
1040 if d > 0 && d < db.maxIdleTime && db.cleanerCh != nil {
1041 select {
1042 case db.cleanerCh <- struct{}{}:
1043 default:
1044 }
1045 }
1046 db.maxIdleTime = d
1047 db.startCleanerLocked()
1048 }
1049
1050
1051 func (db *DB) startCleanerLocked() {
1052 if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil {
1053 db.cleanerCh = make(chan struct{}, 1)
1054 go db.connectionCleaner(db.shortestIdleTimeLocked())
1055 }
1056 }
1057
1058 func (db *DB) connectionCleaner(d time.Duration) {
1059 const minInterval = time.Second
1060
1061 if d < minInterval {
1062 d = minInterval
1063 }
1064 t := time.NewTimer(d)
1065
1066 for {
1067 select {
1068 case <-t.C:
1069 case <-db.cleanerCh:
1070 }
1071
1072 db.mu.Lock()
1073
1074 d = db.shortestIdleTimeLocked()
1075 if db.closed || db.numOpen == 0 || d <= 0 {
1076 db.cleanerCh = nil
1077 db.mu.Unlock()
1078 return
1079 }
1080
1081 d, closing := db.connectionCleanerRunLocked(d)
1082 db.mu.Unlock()
1083 for _, c := range closing {
1084 c.Close()
1085 }
1086
1087 if d < minInterval {
1088 d = minInterval
1089 }
1090
1091 if !t.Stop() {
1092 select {
1093 case <-t.C:
1094 default:
1095 }
1096 }
1097 t.Reset(d)
1098 }
1099 }
1100
1101
1102
1103
1104 func (db *DB) connectionCleanerRunLocked(d time.Duration) (time.Duration, []*driverConn) {
1105 var idleClosing int64
1106 var closing []*driverConn
1107 if db.maxIdleTime > 0 {
1108
1109
1110 idleSince := nowFunc().Add(-db.maxIdleTime)
1111 last := len(db.freeConn) - 1
1112 for i := last; i >= 0; i-- {
1113 c := db.freeConn[i]
1114 if c.returnedAt.Before(idleSince) {
1115 i++
1116 closing = db.freeConn[:i:i]
1117 db.freeConn = db.freeConn[i:]
1118 idleClosing = int64(len(closing))
1119 db.maxIdleTimeClosed += idleClosing
1120 break
1121 }
1122 }
1123
1124 if len(db.freeConn) > 0 {
1125 c := db.freeConn[0]
1126 if d2 := c.returnedAt.Sub(idleSince); d2 < d {
1127
1128
1129 d = d2
1130 }
1131 }
1132 }
1133
1134 if db.maxLifetime > 0 {
1135 expiredSince := nowFunc().Add(-db.maxLifetime)
1136 for i := 0; i < len(db.freeConn); i++ {
1137 c := db.freeConn[i]
1138 if c.createdAt.Before(expiredSince) {
1139 closing = append(closing, c)
1140
1141 last := len(db.freeConn) - 1
1142
1143
1144 copy(db.freeConn[i:], db.freeConn[i+1:])
1145 db.freeConn[last] = nil
1146 db.freeConn = db.freeConn[:last]
1147 i--
1148 } else if d2 := c.createdAt.Sub(expiredSince); d2 < d {
1149
1150
1151 d = d2
1152 }
1153 }
1154 db.maxLifetimeClosed += int64(len(closing)) - idleClosing
1155 }
1156
1157 return d, closing
1158 }
1159
1160
1161 type DBStats struct {
1162 MaxOpenConnections int
1163
1164
1165 OpenConnections int
1166 InUse int
1167 Idle int
1168
1169
1170 WaitCount int64
1171 WaitDuration time.Duration
1172 MaxIdleClosed int64
1173 MaxIdleTimeClosed int64
1174 MaxLifetimeClosed int64
1175 }
1176
1177
1178 func (db *DB) Stats() DBStats {
1179 wait := atomic.LoadInt64(&db.waitDuration)
1180
1181 db.mu.Lock()
1182 defer db.mu.Unlock()
1183
1184 stats := DBStats{
1185 MaxOpenConnections: db.maxOpen,
1186
1187 Idle: len(db.freeConn),
1188 OpenConnections: db.numOpen,
1189 InUse: db.numOpen - len(db.freeConn),
1190
1191 WaitCount: db.waitCount,
1192 WaitDuration: time.Duration(wait),
1193 MaxIdleClosed: db.maxIdleClosed,
1194 MaxIdleTimeClosed: db.maxIdleTimeClosed,
1195 MaxLifetimeClosed: db.maxLifetimeClosed,
1196 }
1197 return stats
1198 }
1199
1200
1201
1202
1203 func (db *DB) maybeOpenNewConnections() {
1204 numRequests := len(db.connRequests)
1205 if db.maxOpen > 0 {
1206 numCanOpen := db.maxOpen - db.numOpen
1207 if numRequests > numCanOpen {
1208 numRequests = numCanOpen
1209 }
1210 }
1211 for numRequests > 0 {
1212 db.numOpen++
1213 numRequests--
1214 if db.closed {
1215 return
1216 }
1217 db.openerCh <- struct{}{}
1218 }
1219 }
1220
1221
1222 func (db *DB) connectionOpener(ctx context.Context) {
1223 for {
1224 select {
1225 case <-ctx.Done():
1226 return
1227 case <-db.openerCh:
1228 db.openNewConnection(ctx)
1229 }
1230 }
1231 }
1232
1233
1234 func (db *DB) openNewConnection(ctx context.Context) {
1235
1236
1237
1238 ci, err := db.connector.Connect(ctx)
1239 db.mu.Lock()
1240 defer db.mu.Unlock()
1241 if db.closed {
1242 if err == nil {
1243 ci.Close()
1244 }
1245 db.numOpen--
1246 return
1247 }
1248 if err != nil {
1249 db.numOpen--
1250 db.putConnDBLocked(nil, err)
1251 db.maybeOpenNewConnections()
1252 return
1253 }
1254 dc := &driverConn{
1255 db: db,
1256 createdAt: nowFunc(),
1257 returnedAt: nowFunc(),
1258 ci: ci,
1259 }
1260 if db.putConnDBLocked(dc, err) {
1261 db.addDepLocked(dc, dc)
1262 } else {
1263 db.numOpen--
1264 ci.Close()
1265 }
1266 }
1267
1268
1269
1270
1271 type connRequest struct {
1272 conn *driverConn
1273 err error
1274 }
1275
1276 var errDBClosed = errors.New("sql: database is closed")
1277
1278
1279
1280 func (db *DB) nextRequestKeyLocked() uint64 {
1281 next := db.nextRequest
1282 db.nextRequest++
1283 return next
1284 }
1285
1286
1287 func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
1288 db.mu.Lock()
1289 if db.closed {
1290 db.mu.Unlock()
1291 return nil, errDBClosed
1292 }
1293
1294 select {
1295 default:
1296 case <-ctx.Done():
1297 db.mu.Unlock()
1298 return nil, ctx.Err()
1299 }
1300 lifetime := db.maxLifetime
1301
1302
1303 last := len(db.freeConn) - 1
1304 if strategy == cachedOrNewConn && last >= 0 {
1305
1306
1307 conn := db.freeConn[last]
1308 db.freeConn = db.freeConn[:last]
1309 conn.inUse = true
1310 if conn.expired(lifetime) {
1311 db.maxLifetimeClosed++
1312 db.mu.Unlock()
1313 conn.Close()
1314 return nil, driver.ErrBadConn
1315 }
1316 db.mu.Unlock()
1317
1318
1319 if err := conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) {
1320 conn.Close()
1321 return nil, err
1322 }
1323
1324 return conn, nil
1325 }
1326
1327
1328
1329 if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
1330
1331
1332 req := make(chan connRequest, 1)
1333 reqKey := db.nextRequestKeyLocked()
1334 db.connRequests[reqKey] = req
1335 db.waitCount++
1336 db.mu.Unlock()
1337
1338 waitStart := nowFunc()
1339
1340
1341 select {
1342 case <-ctx.Done():
1343
1344
1345 db.mu.Lock()
1346 delete(db.connRequests, reqKey)
1347 db.mu.Unlock()
1348
1349 atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
1350
1351 select {
1352 default:
1353 case ret, ok := <-req:
1354 if ok && ret.conn != nil {
1355 db.putConn(ret.conn, ret.err, false)
1356 }
1357 }
1358 return nil, ctx.Err()
1359 case ret, ok := <-req:
1360 atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
1361
1362 if !ok {
1363 return nil, errDBClosed
1364 }
1365
1366
1367
1368
1369
1370
1371 if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {
1372 db.mu.Lock()
1373 db.maxLifetimeClosed++
1374 db.mu.Unlock()
1375 ret.conn.Close()
1376 return nil, driver.ErrBadConn
1377 }
1378 if ret.conn == nil {
1379 return nil, ret.err
1380 }
1381
1382
1383 if err := ret.conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) {
1384 ret.conn.Close()
1385 return nil, err
1386 }
1387 return ret.conn, ret.err
1388 }
1389 }
1390
1391 db.numOpen++
1392 db.mu.Unlock()
1393 ci, err := db.connector.Connect(ctx)
1394 if err != nil {
1395 db.mu.Lock()
1396 db.numOpen--
1397 db.maybeOpenNewConnections()
1398 db.mu.Unlock()
1399 return nil, err
1400 }
1401 db.mu.Lock()
1402 dc := &driverConn{
1403 db: db,
1404 createdAt: nowFunc(),
1405 returnedAt: nowFunc(),
1406 ci: ci,
1407 inUse: true,
1408 }
1409 db.addDepLocked(dc, dc)
1410 db.mu.Unlock()
1411 return dc, nil
1412 }
1413
1414
1415 var putConnHook func(*DB, *driverConn)
1416
1417
1418
1419
1420 func (db *DB) noteUnusedDriverStatement(c *driverConn, ds *driverStmt) {
1421 db.mu.Lock()
1422 defer db.mu.Unlock()
1423 if c.inUse {
1424 c.onPut = append(c.onPut, func() {
1425 ds.Close()
1426 })
1427 } else {
1428 c.Lock()
1429 fc := c.finalClosed
1430 c.Unlock()
1431 if !fc {
1432 ds.Close()
1433 }
1434 }
1435 }
1436
1437
1438
1439 const debugGetPut = false
1440
1441
1442
1443 func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
1444 if !errors.Is(err, driver.ErrBadConn) {
1445 if !dc.validateConnection(resetSession) {
1446 err = driver.ErrBadConn
1447 }
1448 }
1449 db.mu.Lock()
1450 if !dc.inUse {
1451 db.mu.Unlock()
1452 if debugGetPut {
1453 fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
1454 }
1455 panic("sql: connection returned that was never out")
1456 }
1457
1458 if !errors.Is(err, driver.ErrBadConn) && dc.expired(db.maxLifetime) {
1459 db.maxLifetimeClosed++
1460 err = driver.ErrBadConn
1461 }
1462 if debugGetPut {
1463 db.lastPut[dc] = stack()
1464 }
1465 dc.inUse = false
1466 dc.returnedAt = nowFunc()
1467
1468 for _, fn := range dc.onPut {
1469 fn()
1470 }
1471 dc.onPut = nil
1472
1473 if errors.Is(err, driver.ErrBadConn) {
1474
1475
1476
1477
1478 db.maybeOpenNewConnections()
1479 db.mu.Unlock()
1480 dc.Close()
1481 return
1482 }
1483 if putConnHook != nil {
1484 putConnHook(db, dc)
1485 }
1486 added := db.putConnDBLocked(dc, nil)
1487 db.mu.Unlock()
1488
1489 if !added {
1490 dc.Close()
1491 return
1492 }
1493 }
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504 func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
1505 if db.closed {
1506 return false
1507 }
1508 if db.maxOpen > 0 && db.numOpen > db.maxOpen {
1509 return false
1510 }
1511 if c := len(db.connRequests); c > 0 {
1512 var req chan connRequest
1513 var reqKey uint64
1514 for reqKey, req = range db.connRequests {
1515 break
1516 }
1517 delete(db.connRequests, reqKey)
1518 if err == nil {
1519 dc.inUse = true
1520 }
1521 req <- connRequest{
1522 conn: dc,
1523 err: err,
1524 }
1525 return true
1526 } else if err == nil && !db.closed {
1527 if db.maxIdleConnsLocked() > len(db.freeConn) {
1528 db.freeConn = append(db.freeConn, dc)
1529 db.startCleanerLocked()
1530 return true
1531 }
1532 db.maxIdleClosed++
1533 }
1534 return false
1535 }
1536
1537
1538
1539
1540 const maxBadConnRetries = 2
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550 func (db *DB) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
1551 var stmt *Stmt
1552 var err error
1553 var isBadConn bool
1554 for i := 0; i < maxBadConnRetries; i++ {
1555 stmt, err = db.prepare(ctx, query, cachedOrNewConn)
1556 isBadConn = errors.Is(err, driver.ErrBadConn)
1557 if !isBadConn {
1558 break
1559 }
1560 }
1561 if isBadConn {
1562 return db.prepare(ctx, query, alwaysNewConn)
1563 }
1564 return stmt, err
1565 }
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575 func (db *DB) Prepare(query string) (*Stmt, error) {
1576 return db.PrepareContext(context.Background(), query)
1577 }
1578
1579 func (db *DB) prepare(ctx context.Context, query string, strategy connReuseStrategy) (*Stmt, error) {
1580
1581
1582
1583
1584
1585
1586 dc, err := db.conn(ctx, strategy)
1587 if err != nil {
1588 return nil, err
1589 }
1590 return db.prepareDC(ctx, dc, dc.releaseConn, nil, query)
1591 }
1592
1593
1594
1595
1596 func (db *DB) prepareDC(ctx context.Context, dc *driverConn, release func(error), cg stmtConnGrabber, query string) (*Stmt, error) {
1597 var ds *driverStmt
1598 var err error
1599 defer func() {
1600 release(err)
1601 }()
1602 withLock(dc, func() {
1603 ds, err = dc.prepareLocked(ctx, cg, query)
1604 })
1605 if err != nil {
1606 return nil, err
1607 }
1608 stmt := &Stmt{
1609 db: db,
1610 query: query,
1611 cg: cg,
1612 cgds: ds,
1613 }
1614
1615
1616
1617
1618 if cg == nil {
1619 stmt.css = []connStmt{{dc, ds}}
1620 stmt.lastNumClosed = atomic.LoadUint64(&db.numClosed)
1621 db.addDep(stmt, stmt)
1622 }
1623 return stmt, nil
1624 }
1625
1626
1627
1628 func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (Result, error) {
1629 var res Result
1630 var err error
1631 var isBadConn bool
1632 for i := 0; i < maxBadConnRetries; i++ {
1633 res, err = db.exec(ctx, query, args, cachedOrNewConn)
1634 isBadConn = errors.Is(err, driver.ErrBadConn)
1635 if !isBadConn {
1636 break
1637 }
1638 }
1639 if isBadConn {
1640 return db.exec(ctx, query, args, alwaysNewConn)
1641 }
1642 return res, err
1643 }
1644
1645
1646
1647
1648
1649
1650 func (db *DB) Exec(query string, args ...any) (Result, error) {
1651 return db.ExecContext(context.Background(), query, args...)
1652 }
1653
1654 func (db *DB) exec(ctx context.Context, query string, args []any, strategy connReuseStrategy) (Result, error) {
1655 dc, err := db.conn(ctx, strategy)
1656 if err != nil {
1657 return nil, err
1658 }
1659 return db.execDC(ctx, dc, dc.releaseConn, query, args)
1660 }
1661
1662 func (db *DB) execDC(ctx context.Context, dc *driverConn, release func(error), query string, args []any) (res Result, err error) {
1663 defer func() {
1664 release(err)
1665 }()
1666 execerCtx, ok := dc.ci.(driver.ExecerContext)
1667 var execer driver.Execer
1668 if !ok {
1669 execer, ok = dc.ci.(driver.Execer)
1670 }
1671 if ok {
1672 var nvdargs []driver.NamedValue
1673 var resi driver.Result
1674 withLock(dc, func() {
1675 nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
1676 if err != nil {
1677 return
1678 }
1679 resi, err = ctxDriverExec(ctx, execerCtx, execer, query, nvdargs)
1680 })
1681 if err != driver.ErrSkip {
1682 if err != nil {
1683 return nil, err
1684 }
1685 return driverResult{dc, resi}, nil
1686 }
1687 }
1688
1689 var si driver.Stmt
1690 withLock(dc, func() {
1691 si, err = ctxDriverPrepare(ctx, dc.ci, query)
1692 })
1693 if err != nil {
1694 return nil, err
1695 }
1696 ds := &driverStmt{Locker: dc, si: si}
1697 defer ds.Close()
1698 return resultFromStatement(ctx, dc.ci, ds, args...)
1699 }
1700
1701
1702
1703 func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {
1704 var rows *Rows
1705 var err error
1706 var isBadConn bool
1707 for i := 0; i < maxBadConnRetries; i++ {
1708 rows, err = db.query(ctx, query, args, cachedOrNewConn)
1709 isBadConn = errors.Is(err, driver.ErrBadConn)
1710 if !isBadConn {
1711 break
1712 }
1713 }
1714 if isBadConn {
1715 return db.query(ctx, query, args, alwaysNewConn)
1716 }
1717 return rows, err
1718 }
1719
1720
1721
1722
1723
1724
1725 func (db *DB) Query(query string, args ...any) (*Rows, error) {
1726 return db.QueryContext(context.Background(), query, args...)
1727 }
1728
1729 func (db *DB) query(ctx context.Context, query string, args []any, strategy connReuseStrategy) (*Rows, error) {
1730 dc, err := db.conn(ctx, strategy)
1731 if err != nil {
1732 return nil, err
1733 }
1734
1735 return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
1736 }
1737
1738
1739
1740
1741
1742 func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []any) (*Rows, error) {
1743 queryerCtx, ok := dc.ci.(driver.QueryerContext)
1744 var queryer driver.Queryer
1745 if !ok {
1746 queryer, ok = dc.ci.(driver.Queryer)
1747 }
1748 if ok {
1749 var nvdargs []driver.NamedValue
1750 var rowsi driver.Rows
1751 var err error
1752 withLock(dc, func() {
1753 nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
1754 if err != nil {
1755 return
1756 }
1757 rowsi, err = ctxDriverQuery(ctx, queryerCtx, queryer, query, nvdargs)
1758 })
1759 if err != driver.ErrSkip {
1760 if err != nil {
1761 releaseConn(err)
1762 return nil, err
1763 }
1764
1765
1766 rows := &Rows{
1767 dc: dc,
1768 releaseConn: releaseConn,
1769 rowsi: rowsi,
1770 }
1771 rows.initContextClose(ctx, txctx)
1772 return rows, nil
1773 }
1774 }
1775
1776 var si driver.Stmt
1777 var err error
1778 withLock(dc, func() {
1779 si, err = ctxDriverPrepare(ctx, dc.ci, query)
1780 })
1781 if err != nil {
1782 releaseConn(err)
1783 return nil, err
1784 }
1785
1786 ds := &driverStmt{Locker: dc, si: si}
1787 rowsi, err := rowsiFromStatement(ctx, dc.ci, ds, args...)
1788 if err != nil {
1789 ds.Close()
1790 releaseConn(err)
1791 return nil, err
1792 }
1793
1794
1795
1796 rows := &Rows{
1797 dc: dc,
1798 releaseConn: releaseConn,
1799 rowsi: rowsi,
1800 closeStmt: ds,
1801 }
1802 rows.initContextClose(ctx, txctx)
1803 return rows, nil
1804 }
1805
1806
1807
1808
1809
1810
1811
1812 func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) *Row {
1813 rows, err := db.QueryContext(ctx, query, args...)
1814 return &Row{rows: rows, err: err}
1815 }
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826 func (db *DB) QueryRow(query string, args ...any) *Row {
1827 return db.QueryRowContext(context.Background(), query, args...)
1828 }
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840 func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
1841 var tx *Tx
1842 var err error
1843 var isBadConn bool
1844 for i := 0; i < maxBadConnRetries; i++ {
1845 tx, err = db.begin(ctx, opts, cachedOrNewConn)
1846 isBadConn = errors.Is(err, driver.ErrBadConn)
1847 if !isBadConn {
1848 break
1849 }
1850 }
1851 if isBadConn {
1852 return db.begin(ctx, opts, alwaysNewConn)
1853 }
1854 return tx, err
1855 }
1856
1857
1858
1859
1860
1861
1862 func (db *DB) Begin() (*Tx, error) {
1863 return db.BeginTx(context.Background(), nil)
1864 }
1865
1866 func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStrategy) (tx *Tx, err error) {
1867 dc, err := db.conn(ctx, strategy)
1868 if err != nil {
1869 return nil, err
1870 }
1871 return db.beginDC(ctx, dc, dc.releaseConn, opts)
1872 }
1873
1874
1875 func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {
1876 var txi driver.Tx
1877 keepConnOnRollback := false
1878 withLock(dc, func() {
1879 _, hasSessionResetter := dc.ci.(driver.SessionResetter)
1880 _, hasConnectionValidator := dc.ci.(driver.Validator)
1881 keepConnOnRollback = hasSessionResetter && hasConnectionValidator
1882 txi, err = ctxDriverBegin(ctx, opts, dc.ci)
1883 })
1884 if err != nil {
1885 release(err)
1886 return nil, err
1887 }
1888
1889
1890
1891 ctx, cancel := context.WithCancel(ctx)
1892 tx = &Tx{
1893 db: db,
1894 dc: dc,
1895 releaseConn: release,
1896 txi: txi,
1897 cancel: cancel,
1898 keepConnOnRollback: keepConnOnRollback,
1899 ctx: ctx,
1900 }
1901 go tx.awaitDone()
1902 return tx, nil
1903 }
1904
1905
1906 func (db *DB) Driver() driver.Driver {
1907 return db.connector.Driver()
1908 }
1909
1910
1911
1912 var ErrConnDone = errors.New("sql: connection is already closed")
1913
1914
1915
1916
1917
1918
1919
1920
1921 func (db *DB) Conn(ctx context.Context) (*Conn, error) {
1922 var dc *driverConn
1923 var err error
1924 var isBadConn bool
1925 for i := 0; i < maxBadConnRetries; i++ {
1926 dc, err = db.conn(ctx, cachedOrNewConn)
1927 isBadConn = errors.Is(err, driver.ErrBadConn)
1928 if !isBadConn {
1929 break
1930 }
1931 }
1932 if isBadConn {
1933 dc, err = db.conn(ctx, alwaysNewConn)
1934 }
1935 if err != nil {
1936 return nil, err
1937 }
1938
1939 conn := &Conn{
1940 db: db,
1941 dc: dc,
1942 }
1943 return conn, nil
1944 }
1945
1946 type releaseConn func(error)
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957 type Conn struct {
1958 db *DB
1959
1960
1961
1962
1963 closemu sync.RWMutex
1964
1965
1966
1967 dc *driverConn
1968
1969
1970
1971
1972 done int32
1973 }
1974
1975
1976
1977 func (c *Conn) grabConn(context.Context) (*driverConn, releaseConn, error) {
1978 if atomic.LoadInt32(&c.done) != 0 {
1979 return nil, nil, ErrConnDone
1980 }
1981 c.closemu.RLock()
1982 return c.dc, c.closemuRUnlockCondReleaseConn, nil
1983 }
1984
1985
1986 func (c *Conn) PingContext(ctx context.Context) error {
1987 dc, release, err := c.grabConn(ctx)
1988 if err != nil {
1989 return err
1990 }
1991 return c.db.pingDC(ctx, dc, release)
1992 }
1993
1994
1995
1996 func (c *Conn) ExecContext(ctx context.Context, query string, args ...any) (Result, error) {
1997 dc, release, err := c.grabConn(ctx)
1998 if err != nil {
1999 return nil, err
2000 }
2001 return c.db.execDC(ctx, dc, release, query, args)
2002 }
2003
2004
2005
2006 func (c *Conn) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {
2007 dc, release, err := c.grabConn(ctx)
2008 if err != nil {
2009 return nil, err
2010 }
2011 return c.db.queryDC(ctx, nil, dc, release, query, args)
2012 }
2013
2014
2015
2016
2017
2018
2019
2020 func (c *Conn) QueryRowContext(ctx context.Context, query string, args ...any) *Row {
2021 rows, err := c.QueryContext(ctx, query, args...)
2022 return &Row{rows: rows, err: err}
2023 }
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033 func (c *Conn) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
2034 dc, release, err := c.grabConn(ctx)
2035 if err != nil {
2036 return nil, err
2037 }
2038 return c.db.prepareDC(ctx, dc, release, c, query)
2039 }
2040
2041
2042
2043
2044
2045
2046 func (c *Conn) Raw(f func(driverConn any) error) (err error) {
2047 var dc *driverConn
2048 var release releaseConn
2049
2050
2051 dc, release, err = c.grabConn(nil)
2052 if err != nil {
2053 return
2054 }
2055 fPanic := true
2056 dc.Mutex.Lock()
2057 defer func() {
2058 dc.Mutex.Unlock()
2059
2060
2061
2062
2063 if fPanic {
2064 err = driver.ErrBadConn
2065 }
2066 release(err)
2067 }()
2068 err = f(dc.ci)
2069 fPanic = false
2070
2071 return
2072 }
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084 func (c *Conn) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
2085 dc, release, err := c.grabConn(ctx)
2086 if err != nil {
2087 return nil, err
2088 }
2089 return c.db.beginDC(ctx, dc, release, opts)
2090 }
2091
2092
2093
2094 func (c *Conn) closemuRUnlockCondReleaseConn(err error) {
2095 c.closemu.RUnlock()
2096 if errors.Is(err, driver.ErrBadConn) {
2097 c.close(err)
2098 }
2099 }
2100
2101 func (c *Conn) txCtx() context.Context {
2102 return nil
2103 }
2104
2105 func (c *Conn) close(err error) error {
2106 if !atomic.CompareAndSwapInt32(&c.done, 0, 1) {
2107 return ErrConnDone
2108 }
2109
2110
2111
2112 c.closemu.Lock()
2113 defer c.closemu.Unlock()
2114
2115 c.dc.releaseConn(err)
2116 c.dc = nil
2117 c.db = nil
2118 return err
2119 }
2120
2121
2122
2123
2124
2125
2126 func (c *Conn) Close() error {
2127 return c.close(nil)
2128 }
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140 type Tx struct {
2141 db *DB
2142
2143
2144
2145
2146 closemu sync.RWMutex
2147
2148
2149
2150 dc *driverConn
2151 txi driver.Tx
2152
2153
2154
2155 releaseConn func(error)
2156
2157
2158
2159
2160
2161 done int32
2162
2163
2164
2165
2166 keepConnOnRollback bool
2167
2168
2169
2170 stmts struct {
2171 sync.Mutex
2172 v []*Stmt
2173 }
2174
2175
2176 cancel func()
2177
2178
2179 ctx context.Context
2180 }
2181
2182
2183
2184 func (tx *Tx) awaitDone() {
2185
2186
2187 <-tx.ctx.Done()
2188
2189
2190
2191
2192
2193
2194
2195 discardConnection := !tx.keepConnOnRollback
2196 tx.rollback(discardConnection)
2197 }
2198
2199 func (tx *Tx) isDone() bool {
2200 return atomic.LoadInt32(&tx.done) != 0
2201 }
2202
2203
2204
2205 var ErrTxDone = errors.New("sql: transaction has already been committed or rolled back")
2206
2207
2208
2209
2210 func (tx *Tx) close(err error) {
2211 tx.releaseConn(err)
2212 tx.dc = nil
2213 tx.txi = nil
2214 }
2215
2216
2217
2218 var hookTxGrabConn func()
2219
2220 func (tx *Tx) grabConn(ctx context.Context) (*driverConn, releaseConn, error) {
2221 select {
2222 default:
2223 case <-ctx.Done():
2224 return nil, nil, ctx.Err()
2225 }
2226
2227
2228
2229 tx.closemu.RLock()
2230 if tx.isDone() {
2231 tx.closemu.RUnlock()
2232 return nil, nil, ErrTxDone
2233 }
2234 if hookTxGrabConn != nil {
2235 hookTxGrabConn()
2236 }
2237 return tx.dc, tx.closemuRUnlockRelease, nil
2238 }
2239
2240 func (tx *Tx) txCtx() context.Context {
2241 return tx.ctx
2242 }
2243
2244
2245
2246
2247
2248 func (tx *Tx) closemuRUnlockRelease(error) {
2249 tx.closemu.RUnlock()
2250 }
2251
2252
2253 func (tx *Tx) closePrepared() {
2254 tx.stmts.Lock()
2255 defer tx.stmts.Unlock()
2256 for _, stmt := range tx.stmts.v {
2257 stmt.Close()
2258 }
2259 }
2260
2261
2262 func (tx *Tx) Commit() error {
2263
2264
2265
2266 select {
2267 default:
2268 case <-tx.ctx.Done():
2269 if atomic.LoadInt32(&tx.done) == 1 {
2270 return ErrTxDone
2271 }
2272 return tx.ctx.Err()
2273 }
2274 if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
2275 return ErrTxDone
2276 }
2277
2278
2279
2280
2281
2282 tx.cancel()
2283 tx.closemu.Lock()
2284 tx.closemu.Unlock()
2285
2286 var err error
2287 withLock(tx.dc, func() {
2288 err = tx.txi.Commit()
2289 })
2290 if !errors.Is(err, driver.ErrBadConn) {
2291 tx.closePrepared()
2292 }
2293 tx.close(err)
2294 return err
2295 }
2296
2297 var rollbackHook func()
2298
2299
2300
2301 func (tx *Tx) rollback(discardConn bool) error {
2302 if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
2303 return ErrTxDone
2304 }
2305
2306 if rollbackHook != nil {
2307 rollbackHook()
2308 }
2309
2310
2311
2312
2313
2314 tx.cancel()
2315 tx.closemu.Lock()
2316 tx.closemu.Unlock()
2317
2318 var err error
2319 withLock(tx.dc, func() {
2320 err = tx.txi.Rollback()
2321 })
2322 if !errors.Is(err, driver.ErrBadConn) {
2323 tx.closePrepared()
2324 }
2325 if discardConn {
2326 err = driver.ErrBadConn
2327 }
2328 tx.close(err)
2329 return err
2330 }
2331
2332
2333 func (tx *Tx) Rollback() error {
2334 return tx.rollback(false)
2335 }
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347 func (tx *Tx) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
2348 dc, release, err := tx.grabConn(ctx)
2349 if err != nil {
2350 return nil, err
2351 }
2352
2353 stmt, err := tx.db.prepareDC(ctx, dc, release, tx, query)
2354 if err != nil {
2355 return nil, err
2356 }
2357 tx.stmts.Lock()
2358 tx.stmts.v = append(tx.stmts.v, stmt)
2359 tx.stmts.Unlock()
2360 return stmt, nil
2361 }
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372 func (tx *Tx) Prepare(query string) (*Stmt, error) {
2373 return tx.PrepareContext(context.Background(), query)
2374 }
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392 func (tx *Tx) StmtContext(ctx context.Context, stmt *Stmt) *Stmt {
2393 dc, release, err := tx.grabConn(ctx)
2394 if err != nil {
2395 return &Stmt{stickyErr: err}
2396 }
2397 defer release(nil)
2398
2399 if tx.db != stmt.db {
2400 return &Stmt{stickyErr: errors.New("sql: Tx.Stmt: statement from different database used")}
2401 }
2402 var si driver.Stmt
2403 var parentStmt *Stmt
2404 stmt.mu.Lock()
2405 if stmt.closed || stmt.cg != nil {
2406
2407
2408
2409
2410
2411
2412 stmt.mu.Unlock()
2413 withLock(dc, func() {
2414 si, err = ctxDriverPrepare(ctx, dc.ci, stmt.query)
2415 })
2416 if err != nil {
2417 return &Stmt{stickyErr: err}
2418 }
2419 } else {
2420 stmt.removeClosedStmtLocked()
2421
2422
2423 for _, v := range stmt.css {
2424 if v.dc == dc {
2425 si = v.ds.si
2426 break
2427 }
2428 }
2429
2430 stmt.mu.Unlock()
2431
2432 if si == nil {
2433 var ds *driverStmt
2434 withLock(dc, func() {
2435 ds, err = stmt.prepareOnConnLocked(ctx, dc)
2436 })
2437 if err != nil {
2438 return &Stmt{stickyErr: err}
2439 }
2440 si = ds.si
2441 }
2442 parentStmt = stmt
2443 }
2444
2445 txs := &Stmt{
2446 db: tx.db,
2447 cg: tx,
2448 cgds: &driverStmt{
2449 Locker: dc,
2450 si: si,
2451 },
2452 parentStmt: parentStmt,
2453 query: stmt.query,
2454 }
2455 if parentStmt != nil {
2456 tx.db.addDep(parentStmt, txs)
2457 }
2458 tx.stmts.Lock()
2459 tx.stmts.v = append(tx.stmts.v, txs)
2460 tx.stmts.Unlock()
2461 return txs
2462 }
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480 func (tx *Tx) Stmt(stmt *Stmt) *Stmt {
2481 return tx.StmtContext(context.Background(), stmt)
2482 }
2483
2484
2485
2486 func (tx *Tx) ExecContext(ctx context.Context, query string, args ...any) (Result, error) {
2487 dc, release, err := tx.grabConn(ctx)
2488 if err != nil {
2489 return nil, err
2490 }
2491 return tx.db.execDC(ctx, dc, release, query, args)
2492 }
2493
2494
2495
2496
2497
2498
2499 func (tx *Tx) Exec(query string, args ...any) (Result, error) {
2500 return tx.ExecContext(context.Background(), query, args...)
2501 }
2502
2503
2504 func (tx *Tx) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {
2505 dc, release, err := tx.grabConn(ctx)
2506 if err != nil {
2507 return nil, err
2508 }
2509
2510 return tx.db.queryDC(ctx, tx.ctx, dc, release, query, args)
2511 }
2512
2513
2514
2515
2516
2517 func (tx *Tx) Query(query string, args ...any) (*Rows, error) {
2518 return tx.QueryContext(context.Background(), query, args...)
2519 }
2520
2521
2522
2523
2524
2525
2526
2527 func (tx *Tx) QueryRowContext(ctx context.Context, query string, args ...any) *Row {
2528 rows, err := tx.QueryContext(ctx, query, args...)
2529 return &Row{rows: rows, err: err}
2530 }
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541 func (tx *Tx) QueryRow(query string, args ...any) *Row {
2542 return tx.QueryRowContext(context.Background(), query, args...)
2543 }
2544
2545
2546 type connStmt struct {
2547 dc *driverConn
2548 ds *driverStmt
2549 }
2550
2551
2552
2553 type stmtConnGrabber interface {
2554
2555
2556 grabConn(context.Context) (*driverConn, releaseConn, error)
2557
2558
2559
2560
2561 txCtx() context.Context
2562 }
2563
2564 var (
2565 _ stmtConnGrabber = &Tx{}
2566 _ stmtConnGrabber = &Conn{}
2567 )
2568
2569
2570
2571
2572
2573
2574
2575
2576
2577
2578 type Stmt struct {
2579
2580 db *DB
2581 query string
2582 stickyErr error
2583
2584 closemu sync.RWMutex
2585
2586
2587
2588
2589
2590
2591 cg stmtConnGrabber
2592 cgds *driverStmt
2593
2594
2595
2596
2597
2598
2599
2600 parentStmt *Stmt
2601
2602 mu sync.Mutex
2603 closed bool
2604
2605
2606
2607
2608
2609 css []connStmt
2610
2611
2612
2613 lastNumClosed uint64
2614 }
2615
2616
2617
2618 func (s *Stmt) ExecContext(ctx context.Context, args ...any) (Result, error) {
2619 s.closemu.RLock()
2620 defer s.closemu.RUnlock()
2621
2622 var res Result
2623 strategy := cachedOrNewConn
2624 for i := 0; i < maxBadConnRetries+1; i++ {
2625 if i == maxBadConnRetries {
2626 strategy = alwaysNewConn
2627 }
2628 dc, releaseConn, ds, err := s.connStmt(ctx, strategy)
2629 if err != nil {
2630 if errors.Is(err, driver.ErrBadConn) {
2631 continue
2632 }
2633 return nil, err
2634 }
2635
2636 res, err = resultFromStatement(ctx, dc.ci, ds, args...)
2637 releaseConn(err)
2638 if !errors.Is(err, driver.ErrBadConn) {
2639 return res, err
2640 }
2641 }
2642 return nil, driver.ErrBadConn
2643 }
2644
2645
2646
2647
2648
2649
2650 func (s *Stmt) Exec(args ...any) (Result, error) {
2651 return s.ExecContext(context.Background(), args...)
2652 }
2653
2654 func resultFromStatement(ctx context.Context, ci driver.Conn, ds *driverStmt, args ...any) (Result, error) {
2655 ds.Lock()
2656 defer ds.Unlock()
2657
2658 dargs, err := driverArgsConnLocked(ci, ds, args)
2659 if err != nil {
2660 return nil, err
2661 }
2662
2663 resi, err := ctxDriverStmtExec(ctx, ds.si, dargs)
2664 if err != nil {
2665 return nil, err
2666 }
2667 return driverResult{ds.Locker, resi}, nil
2668 }
2669
2670
2671
2672
2673
2674 func (s *Stmt) removeClosedStmtLocked() {
2675 t := len(s.css)/2 + 1
2676 if t > 10 {
2677 t = 10
2678 }
2679 dbClosed := atomic.LoadUint64(&s.db.numClosed)
2680 if dbClosed-s.lastNumClosed < uint64(t) {
2681 return
2682 }
2683
2684 s.db.mu.Lock()
2685 for i := 0; i < len(s.css); i++ {
2686 if s.css[i].dc.dbmuClosed {
2687 s.css[i] = s.css[len(s.css)-1]
2688 s.css = s.css[:len(s.css)-1]
2689 i--
2690 }
2691 }
2692 s.db.mu.Unlock()
2693 s.lastNumClosed = dbClosed
2694 }
2695
2696
2697
2698
2699 func (s *Stmt) connStmt(ctx context.Context, strategy connReuseStrategy) (dc *driverConn, releaseConn func(error), ds *driverStmt, err error) {
2700 if err = s.stickyErr; err != nil {
2701 return
2702 }
2703 s.mu.Lock()
2704 if s.closed {
2705 s.mu.Unlock()
2706 err = errors.New("sql: statement is closed")
2707 return
2708 }
2709
2710
2711
2712 if s.cg != nil {
2713 s.mu.Unlock()
2714 dc, releaseConn, err = s.cg.grabConn(ctx)
2715 if err != nil {
2716 return
2717 }
2718 return dc, releaseConn, s.cgds, nil
2719 }
2720
2721 s.removeClosedStmtLocked()
2722 s.mu.Unlock()
2723
2724 dc, err = s.db.conn(ctx, strategy)
2725 if err != nil {
2726 return nil, nil, nil, err
2727 }
2728
2729 s.mu.Lock()
2730 for _, v := range s.css {
2731 if v.dc == dc {
2732 s.mu.Unlock()
2733 return dc, dc.releaseConn, v.ds, nil
2734 }
2735 }
2736 s.mu.Unlock()
2737
2738
2739 withLock(dc, func() {
2740 ds, err = s.prepareOnConnLocked(ctx, dc)
2741 })
2742 if err != nil {
2743 dc.releaseConn(err)
2744 return nil, nil, nil, err
2745 }
2746
2747 return dc, dc.releaseConn, ds, nil
2748 }
2749
2750
2751
2752 func (s *Stmt) prepareOnConnLocked(ctx context.Context, dc *driverConn) (*driverStmt, error) {
2753 si, err := dc.prepareLocked(ctx, s.cg, s.query)
2754 if err != nil {
2755 return nil, err
2756 }
2757 cs := connStmt{dc, si}
2758 s.mu.Lock()
2759 s.css = append(s.css, cs)
2760 s.mu.Unlock()
2761 return cs.ds, nil
2762 }
2763
2764
2765
2766 func (s *Stmt) QueryContext(ctx context.Context, args ...any) (*Rows, error) {
2767 s.closemu.RLock()
2768 defer s.closemu.RUnlock()
2769
2770 var rowsi driver.Rows
2771 strategy := cachedOrNewConn
2772 for i := 0; i < maxBadConnRetries+1; i++ {
2773 if i == maxBadConnRetries {
2774 strategy = alwaysNewConn
2775 }
2776 dc, releaseConn, ds, err := s.connStmt(ctx, strategy)
2777 if err != nil {
2778 if errors.Is(err, driver.ErrBadConn) {
2779 continue
2780 }
2781 return nil, err
2782 }
2783
2784 rowsi, err = rowsiFromStatement(ctx, dc.ci, ds, args...)
2785 if err == nil {
2786
2787
2788 rows := &Rows{
2789 dc: dc,
2790 rowsi: rowsi,
2791
2792 }
2793
2794
2795 s.db.addDep(s, rows)
2796
2797
2798
2799 rows.releaseConn = func(err error) {
2800 releaseConn(err)
2801 s.db.removeDep(s, rows)
2802 }
2803 var txctx context.Context
2804 if s.cg != nil {
2805 txctx = s.cg.txCtx()
2806 }
2807 rows.initContextClose(ctx, txctx)
2808 return rows, nil
2809 }
2810
2811 releaseConn(err)
2812 if !errors.Is(err, driver.ErrBadConn) {
2813 return nil, err
2814 }
2815 }
2816 return nil, driver.ErrBadConn
2817 }
2818
2819
2820
2821
2822
2823
2824 func (s *Stmt) Query(args ...any) (*Rows, error) {
2825 return s.QueryContext(context.Background(), args...)
2826 }
2827
2828 func rowsiFromStatement(ctx context.Context, ci driver.Conn, ds *driverStmt, args ...any) (driver.Rows, error) {
2829 ds.Lock()
2830 defer ds.Unlock()
2831 dargs, err := driverArgsConnLocked(ci, ds, args)
2832 if err != nil {
2833 return nil, err
2834 }
2835 return ctxDriverStmtQuery(ctx, ds.si, dargs)
2836 }
2837
2838
2839
2840
2841
2842
2843
2844 func (s *Stmt) QueryRowContext(ctx context.Context, args ...any) *Row {
2845 rows, err := s.QueryContext(ctx, args...)
2846 if err != nil {
2847 return &Row{err: err}
2848 }
2849 return &Row{rows: rows}
2850 }
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866 func (s *Stmt) QueryRow(args ...any) *Row {
2867 return s.QueryRowContext(context.Background(), args...)
2868 }
2869
2870
2871 func (s *Stmt) Close() error {
2872 s.closemu.Lock()
2873 defer s.closemu.Unlock()
2874
2875 if s.stickyErr != nil {
2876 return s.stickyErr
2877 }
2878 s.mu.Lock()
2879 if s.closed {
2880 s.mu.Unlock()
2881 return nil
2882 }
2883 s.closed = true
2884 txds := s.cgds
2885 s.cgds = nil
2886
2887 s.mu.Unlock()
2888
2889 if s.cg == nil {
2890 return s.db.removeDep(s, s)
2891 }
2892
2893 if s.parentStmt != nil {
2894
2895
2896 return s.db.removeDep(s.parentStmt, s)
2897 }
2898 return txds.Close()
2899 }
2900
2901 func (s *Stmt) finalClose() error {
2902 s.mu.Lock()
2903 defer s.mu.Unlock()
2904 if s.css != nil {
2905 for _, v := range s.css {
2906 s.db.noteUnusedDriverStatement(v.dc, v.ds)
2907 v.dc.removeOpenStmt(v.ds)
2908 }
2909 s.css = nil
2910 }
2911 return nil
2912 }
2913
2914
2915
2916 type Rows struct {
2917 dc *driverConn
2918 releaseConn func(error)
2919 rowsi driver.Rows
2920 cancel func()
2921 closeStmt *driverStmt
2922
2923
2924
2925
2926
2927
2928 closemu sync.RWMutex
2929 closed bool
2930 lasterr error
2931
2932
2933
2934 lastcols []driver.Value
2935 }
2936
2937
2938
2939 func (rs *Rows) lasterrOrErrLocked(err error) error {
2940 if rs.lasterr != nil && rs.lasterr != io.EOF {
2941 return rs.lasterr
2942 }
2943 return err
2944 }
2945
2946
2947
2948 var bypassRowsAwaitDone = false
2949
2950 func (rs *Rows) initContextClose(ctx, txctx context.Context) {
2951 if ctx.Done() == nil && (txctx == nil || txctx.Done() == nil) {
2952 return
2953 }
2954 if bypassRowsAwaitDone {
2955 return
2956 }
2957 ctx, rs.cancel = context.WithCancel(ctx)
2958 go rs.awaitDone(ctx, txctx)
2959 }
2960
2961
2962
2963
2964
2965 func (rs *Rows) awaitDone(ctx, txctx context.Context) {
2966 var txctxDone <-chan struct{}
2967 if txctx != nil {
2968 txctxDone = txctx.Done()
2969 }
2970 select {
2971 case <-ctx.Done():
2972 case <-txctxDone:
2973 }
2974 rs.close(ctx.Err())
2975 }
2976
2977
2978
2979
2980
2981
2982
2983 func (rs *Rows) Next() bool {
2984 var doClose, ok bool
2985 withLock(rs.closemu.RLocker(), func() {
2986 doClose, ok = rs.nextLocked()
2987 })
2988 if doClose {
2989 rs.Close()
2990 }
2991 return ok
2992 }
2993
2994 func (rs *Rows) nextLocked() (doClose, ok bool) {
2995 if rs.closed {
2996 return false, false
2997 }
2998
2999
3000
3001 rs.dc.Lock()
3002 defer rs.dc.Unlock()
3003
3004 if rs.lastcols == nil {
3005 rs.lastcols = make([]driver.Value, len(rs.rowsi.Columns()))
3006 }
3007
3008 rs.lasterr = rs.rowsi.Next(rs.lastcols)
3009 if rs.lasterr != nil {
3010
3011 if rs.lasterr != io.EOF {
3012 return true, false
3013 }
3014 nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet)
3015 if !ok {
3016 return true, false
3017 }
3018
3019
3020
3021 if !nextResultSet.HasNextResultSet() {
3022 doClose = true
3023 }
3024 return doClose, false
3025 }
3026 return false, true
3027 }
3028
3029
3030
3031
3032
3033
3034
3035
3036
3037 func (rs *Rows) NextResultSet() bool {
3038 var doClose bool
3039 defer func() {
3040 if doClose {
3041 rs.Close()
3042 }
3043 }()
3044 rs.closemu.RLock()
3045 defer rs.closemu.RUnlock()
3046
3047 if rs.closed {
3048 return false
3049 }
3050
3051 rs.lastcols = nil
3052 nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet)
3053 if !ok {
3054 doClose = true
3055 return false
3056 }
3057
3058
3059
3060 rs.dc.Lock()
3061 defer rs.dc.Unlock()
3062
3063 rs.lasterr = nextResultSet.NextResultSet()
3064 if rs.lasterr != nil {
3065 doClose = true
3066 return false
3067 }
3068 return true
3069 }
3070
3071
3072
3073 func (rs *Rows) Err() error {
3074 rs.closemu.RLock()
3075 defer rs.closemu.RUnlock()
3076 return rs.lasterrOrErrLocked(nil)
3077 }
3078
3079 var errRowsClosed = errors.New("sql: Rows are closed")
3080 var errNoRows = errors.New("sql: no Rows available")
3081
3082
3083
3084 func (rs *Rows) Columns() ([]string, error) {
3085 rs.closemu.RLock()
3086 defer rs.closemu.RUnlock()
3087 if rs.closed {
3088 return nil, rs.lasterrOrErrLocked(errRowsClosed)
3089 }
3090 if rs.rowsi == nil {
3091 return nil, rs.lasterrOrErrLocked(errNoRows)
3092 }
3093 rs.dc.Lock()
3094 defer rs.dc.Unlock()
3095
3096 return rs.rowsi.Columns(), nil
3097 }
3098
3099
3100
3101 func (rs *Rows) ColumnTypes() ([]*ColumnType, error) {
3102 rs.closemu.RLock()
3103 defer rs.closemu.RUnlock()
3104 if rs.closed {
3105 return nil, rs.lasterrOrErrLocked(errRowsClosed)
3106 }
3107 if rs.rowsi == nil {
3108 return nil, rs.lasterrOrErrLocked(errNoRows)
3109 }
3110 rs.dc.Lock()
3111 defer rs.dc.Unlock()
3112
3113 return rowsColumnInfoSetupConnLocked(rs.rowsi), nil
3114 }
3115
3116
3117 type ColumnType struct {
3118 name string
3119
3120 hasNullable bool
3121 hasLength bool
3122 hasPrecisionScale bool
3123
3124 nullable bool
3125 length int64
3126 databaseType string
3127 precision int64
3128 scale int64
3129 scanType reflect.Type
3130 }
3131
3132
3133 func (ci *ColumnType) Name() string {
3134 return ci.name
3135 }
3136
3137
3138
3139
3140
3141
3142 func (ci *ColumnType) Length() (length int64, ok bool) {
3143 return ci.length, ci.hasLength
3144 }
3145
3146
3147
3148 func (ci *ColumnType) DecimalSize() (precision, scale int64, ok bool) {
3149 return ci.precision, ci.scale, ci.hasPrecisionScale
3150 }
3151
3152
3153
3154
3155 func (ci *ColumnType) ScanType() reflect.Type {
3156 return ci.scanType
3157 }
3158
3159
3160
3161 func (ci *ColumnType) Nullable() (nullable, ok bool) {
3162 return ci.nullable, ci.hasNullable
3163 }
3164
3165
3166
3167
3168
3169
3170
3171 func (ci *ColumnType) DatabaseTypeName() string {
3172 return ci.databaseType
3173 }
3174
3175 func rowsColumnInfoSetupConnLocked(rowsi driver.Rows) []*ColumnType {
3176 names := rowsi.Columns()
3177
3178 list := make([]*ColumnType, len(names))
3179 for i := range list {
3180 ci := &ColumnType{
3181 name: names[i],
3182 }
3183 list[i] = ci
3184
3185 if prop, ok := rowsi.(driver.RowsColumnTypeScanType); ok {
3186 ci.scanType = prop.ColumnTypeScanType(i)
3187 } else {
3188 ci.scanType = reflect.TypeOf(new(any)).Elem()
3189 }
3190 if prop, ok := rowsi.(driver.RowsColumnTypeDatabaseTypeName); ok {
3191 ci.databaseType = prop.ColumnTypeDatabaseTypeName(i)
3192 }
3193 if prop, ok := rowsi.(driver.RowsColumnTypeLength); ok {
3194 ci.length, ci.hasLength = prop.ColumnTypeLength(i)
3195 }
3196 if prop, ok := rowsi.(driver.RowsColumnTypeNullable); ok {
3197 ci.nullable, ci.hasNullable = prop.ColumnTypeNullable(i)
3198 }
3199 if prop, ok := rowsi.(driver.RowsColumnTypePrecisionScale); ok {
3200 ci.precision, ci.scale, ci.hasPrecisionScale = prop.ColumnTypePrecisionScale(i)
3201 }
3202 }
3203 return list
3204 }
3205
3206
3207
3208
3209
3210
3211
3212
3213
3214
3215
3216
3217
3218
3219
3220
3221
3222
3223
3224
3225
3226
3227
3228
3229
3230
3231
3232
3233
3234
3235
3236
3237
3238
3239
3240
3241
3242
3243
3244
3245
3246
3247
3248
3249
3250
3251
3252
3253
3254
3255
3256
3257
3258
3259
3260
3261
3262
3263
3264
3265
3266 func (rs *Rows) Scan(dest ...any) error {
3267 rs.closemu.RLock()
3268
3269 if rs.lasterr != nil && rs.lasterr != io.EOF {
3270 rs.closemu.RUnlock()
3271 return rs.lasterr
3272 }
3273 if rs.closed {
3274 err := rs.lasterrOrErrLocked(errRowsClosed)
3275 rs.closemu.RUnlock()
3276 return err
3277 }
3278 rs.closemu.RUnlock()
3279
3280 if rs.lastcols == nil {
3281 return errors.New("sql: Scan called without calling Next")
3282 }
3283 if len(dest) != len(rs.lastcols) {
3284 return fmt.Errorf("sql: expected %d destination arguments in Scan, not %d", len(rs.lastcols), len(dest))
3285 }
3286 for i, sv := range rs.lastcols {
3287 err := convertAssignRows(dest[i], sv, rs)
3288 if err != nil {
3289 return fmt.Errorf(`sql: Scan error on column index %d, name %q: %w`, i, rs.rowsi.Columns()[i], err)
3290 }
3291 }
3292 return nil
3293 }
3294
3295
3296
3297 var rowsCloseHook = func() func(*Rows, *error) { return nil }
3298
3299
3300
3301
3302
3303 func (rs *Rows) Close() error {
3304 return rs.close(nil)
3305 }
3306
3307 func (rs *Rows) close(err error) error {
3308 rs.closemu.Lock()
3309 defer rs.closemu.Unlock()
3310
3311 if rs.closed {
3312 return nil
3313 }
3314 rs.closed = true
3315
3316 if rs.lasterr == nil {
3317 rs.lasterr = err
3318 }
3319
3320 withLock(rs.dc, func() {
3321 err = rs.rowsi.Close()
3322 })
3323 if fn := rowsCloseHook(); fn != nil {
3324 fn(rs, &err)
3325 }
3326 if rs.cancel != nil {
3327 rs.cancel()
3328 }
3329
3330 if rs.closeStmt != nil {
3331 rs.closeStmt.Close()
3332 }
3333 rs.releaseConn(err)
3334
3335 rs.lasterr = rs.lasterrOrErrLocked(err)
3336 return err
3337 }
3338
3339
3340 type Row struct {
3341
3342 err error
3343 rows *Rows
3344 }
3345
3346
3347
3348
3349
3350
3351 func (r *Row) Scan(dest ...any) error {
3352 if r.err != nil {
3353 return r.err
3354 }
3355
3356
3357
3358
3359
3360
3361
3362
3363
3364
3365
3366
3367
3368
3369 defer r.rows.Close()
3370 for _, dp := range dest {
3371 if _, ok := dp.(*RawBytes); ok {
3372 return errors.New("sql: RawBytes isn't allowed on Row.Scan")
3373 }
3374 }
3375
3376 if !r.rows.Next() {
3377 if err := r.rows.Err(); err != nil {
3378 return err
3379 }
3380 return ErrNoRows
3381 }
3382 err := r.rows.Scan(dest...)
3383 if err != nil {
3384 return err
3385 }
3386
3387 return r.rows.Close()
3388 }
3389
3390
3391
3392
3393
3394 func (r *Row) Err() error {
3395 return r.err
3396 }
3397
3398
3399 type Result interface {
3400
3401
3402
3403
3404
3405 LastInsertId() (int64, error)
3406
3407
3408
3409
3410 RowsAffected() (int64, error)
3411 }
3412
3413 type driverResult struct {
3414 sync.Locker
3415 resi driver.Result
3416 }
3417
3418 func (dr driverResult) LastInsertId() (int64, error) {
3419 dr.Lock()
3420 defer dr.Unlock()
3421 return dr.resi.LastInsertId()
3422 }
3423
3424 func (dr driverResult) RowsAffected() (int64, error) {
3425 dr.Lock()
3426 defer dr.Unlock()
3427 return dr.resi.RowsAffected()
3428 }
3429
3430 func stack() string {
3431 var buf [2 << 10]byte
3432 return string(buf[:runtime.Stack(buf[:], false)])
3433 }
3434
3435
3436 func withLock(lk sync.Locker, fn func()) {
3437 lk.Lock()
3438 defer lk.Unlock()
3439 fn()
3440 }
3441
View as plain text