-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathdataSource.go
320 lines (291 loc) · 10.5 KB
/
dataSource.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package zorm
import (
"context"
"database/sql"
"errors"
"fmt"
"time"
)
// dataSorce对象,隔离sql原生对象
// dataSorce Isolate sql native objects
type dataSource struct {
*sql.DB
// config *DataSourceConfig
}
// newDataSource 创建一个新的datasource,内部调用,避免外部直接使用datasource
// newDAtaSource Create a new datasource and call it internally to avoid direct external use of the datasource
func newDataSource(config *DataSourceConfig) (*dataSource, error) {
if config == nil {
return nil, errors.New("->newDataSource-->config cannot be nil")
}
if config.DriverName == "" {
return nil, errors.New("->newDataSource-->DriverName cannot be empty")
}
/*
// 兼容处理,DBType即将废弃,请使用Dialect属性
if config.DBType != "" && config.Dialect == "" {
FuncLogError(nil, errors.New("->newDataSource-->DataSourceConfig的DBType即将废弃,请使用Dialect属性"))
config.Dialect = config.DBType
}
*/
if config.Dialect == "" {
return nil, errors.New("->newDataSource-->Dialect cannot be empty")
}
var db *sql.DB
var errSQLOpen error
if config.SQLDB == nil { // 没有已经存在的数据库连接,使用DSN初始化
if config.DSN == "" {
return nil, errors.New("->newDataSource-->DSN cannot be empty")
}
db, errSQLOpen = sql.Open(config.DriverName, config.DSN)
if errSQLOpen != nil {
errSQLOpen = fmt.Errorf("->newDataSource-->open数据库打开失败:%w", errSQLOpen)
FuncLogError(nil, errSQLOpen)
return nil, errSQLOpen
}
} else { // 使用已经存在的数据库连接
db = config.SQLDB
}
if config.MaxOpenConns == 0 {
config.MaxOpenConns = 50
}
if config.MaxIdleConns == 0 {
config.MaxIdleConns = 50
}
if config.ConnMaxLifetimeSecond == 0 {
config.ConnMaxLifetimeSecond = 600
}
// 设置数据库最大连接数
// Set the maximum number of database connections
db.SetMaxOpenConns(config.MaxOpenConns)
// 设置数据库最大空闲连接数
// Set the maximum number of free connections to the database
db.SetMaxIdleConns(config.MaxIdleConns)
//连接存活秒时间. 默认600(10分钟)后连接被销毁重建.避免数据库主动断开连接,造成死连接.MySQL默认wait_timeout 28800秒(8小时)
//(Connection survival time in seconds) Destroy and rebuild the connection after the default 600 seconds (10 minutes)
//Prevent the database from actively disconnecting and causing dead connections. MySQL Default wait_timeout 28800 seconds
db.SetConnMaxLifetime(time.Second * time.Duration(config.ConnMaxLifetimeSecond))
// 验证连接
if pingerr := db.Ping(); pingerr != nil {
pingerr = fmt.Errorf("->newDataSource-->ping数据库失败:%w", pingerr)
FuncLogError(nil, pingerr)
db.Close()
return nil, pingerr
}
return &dataSource{db}, nil
}
// 事务参照:https://www.jianshu.com/p/2a144332c3db
// Transaction reference: https://www.jianshu.com/p/2a144332c3db
// dataBaseConnection 数据库dbConnection会话,可以原生查询或者事务
// dataBaseConnection Database session, native query or transaction.
type dataBaseConnection struct {
// 原生db
// native db
db *sql.DB
// 原生事务
// native transaction
tx *sql.Tx
// 数据库配置
config *DataSourceConfig
}
// beginTx 开启事务
// beginTx Open transaction
func (dbConnection *dataBaseConnection) beginTx(ctx context.Context) error {
if dbConnection.tx != nil {
return nil
}
// 设置事务配置,主要是隔离级别
var txOptions *sql.TxOptions
contextTxOptions := ctx.Value(contextTxOptionsKey)
if contextTxOptions != nil {
txOptions, _ = contextTxOptions.(*sql.TxOptions)
} else {
txOptions = dbConnection.config.DefaultTxOptions
}
tx, err := dbConnection.db.BeginTx(ctx, txOptions)
if err != nil {
err = fmt.Errorf("->beginTx事务开启失败:%w", err)
return err
}
dbConnection.tx = tx
return nil
}
// rollback 回滚事务
// rollback Rollback transaction
func (dbConnection *dataBaseConnection) rollback() error {
if dbConnection.tx == nil {
return nil
}
err := dbConnection.tx.Rollback()
if err != nil {
err = fmt.Errorf("->rollback事务回滚失败:%w", err)
return err
}
dbConnection.tx = nil
return nil
}
// commit 提交事务
// commit Commit transaction
func (dbConnection *dataBaseConnection) commit() error {
if dbConnection.tx == nil {
return errors.New("->dbConnection.commit()事务为空")
}
err := dbConnection.tx.Commit()
if err != nil {
err = fmt.Errorf("->dbConnection.commit()事务提交失败:%w", err)
return err
}
dbConnection.tx = nil
return nil
}
// execContext 执行sql语句,如果已经开启事务,就以事务方式执行,如果没有开启事务,就以非事务方式执行
// execContext Execute sql statement,If the transaction has been opened,it will be executed in transaction mode, if the transaction is not opened,it will be executed in non-transactional mode
func (dbConnection *dataBaseConnection) execContext(ctx context.Context, sqlstr *string, argsValues *[]interface{}) (*sql.Result, error) {
// reBuildSQL 重新处理参数代入方式
execsql, args, err := reBuildSQL(ctx, dbConnection.config, sqlstr, argsValues)
if err != nil {
return nil, err
}
// 更新语句处理ClickHouse特殊语法
err = reBuildUpdateSQL(ctx, dbConnection.config, execsql)
if err != nil {
return nil, err
}
// 执行前加入 hint
err = wrapSQLHint(ctx, execsql)
if err != nil {
return nil, err
}
var start *time.Time
var res sql.Result
// 小于0是禁用日志输出;等于0是只输出日志,不计算SQ执行时间;大于0是计算执行时间,并且大于指定值
slowSQLMillis := dbConnection.config.SlowSQLMillis
if slowSQLMillis == 0 {
FuncPrintSQL(ctx, *execsql, *args, 0)
} else if slowSQLMillis > 0 {
now := time.Now() // 获取当前时间
start = &now
}
if dbConnection.tx != nil {
res, err = dbConnection.tx.ExecContext(ctx, *execsql, *args...)
} else {
res, err = dbConnection.db.ExecContext(ctx, *execsql, *args...)
}
if slowSQLMillis > 0 {
slow := time.Since(*start).Milliseconds()
if slow-int64(slowSQLMillis) >= 0 {
FuncPrintSQL(ctx, *execsql, *args, slow)
}
}
if err != nil {
err = fmt.Errorf("->execContext执行错误:%w,-->zormErrorExecSQL:%s,-->zormErrorSQLValues:%s", err, *execsql, sqlErrorValues2String(*args))
}
return &res, err
}
// queryRowContext 如果已经开启事务,就以事务方式执行,如果没有开启事务,就以非事务方式执行
func (dbConnection *dataBaseConnection) queryRowContext(ctx context.Context, sqlstr *string, argsValues *[]interface{}) (*sql.Row, error) {
// reBuildSQL 重新处理参数代入方式
query, args, err := reBuildSQL(ctx, dbConnection.config, sqlstr, argsValues)
if err != nil {
return nil, err
}
// 执行前加入 hint
err = wrapSQLHint(ctx, query)
if err != nil {
return nil, err
}
var start *time.Time
var row *sql.Row
// 小于0是禁用日志输出;等于0是只输出日志,不计算SQ执行时间;大于0是计算执行时间,并且大于指定值
slowSQLMillis := dbConnection.config.SlowSQLMillis
if slowSQLMillis == 0 {
FuncPrintSQL(ctx, *query, *args, 0)
} else if slowSQLMillis > 0 {
now := time.Now() // 获取当前时间
start = &now
}
if dbConnection.tx != nil {
row = dbConnection.tx.QueryRowContext(ctx, *query, *args...)
} else {
row = dbConnection.db.QueryRowContext(ctx, *query, *args...)
}
if slowSQLMillis > 0 {
slow := time.Since(*start).Milliseconds()
if slow-int64(slowSQLMillis) >= 0 {
FuncPrintSQL(ctx, *query, *args, slow)
}
}
return row, nil
}
// queryContext 查询数据,如果已经开启事务,就以事务方式执行,如果没有开启事务,就以非事务方式执行
// queryRowContext Execute sql row statement,If the transaction has been opened,it will be executed in transaction mode, if the transaction is not opened,it will be executed in non-transactional mode
func (dbConnection *dataBaseConnection) queryContext(ctx context.Context, sqlstr *string, argsValues *[]interface{}) (*sql.Rows, error) {
// reBuildSQL 重新处理参数代入方式
query, args, err := reBuildSQL(ctx, dbConnection.config, sqlstr, argsValues)
if err != nil {
return nil, err
}
// 执行前加入 hint
err = wrapSQLHint(ctx, query)
if err != nil {
return nil, err
}
var start *time.Time
var rows *sql.Rows
// 小于0是禁用日志输出;等于0是只输出日志,不计算SQ执行时间;大于0是计算执行时间,并且大于指定值
slowSQLMillis := dbConnection.config.SlowSQLMillis
if slowSQLMillis == 0 {
FuncPrintSQL(ctx, *query, *args, 0)
} else if slowSQLMillis > 0 {
now := time.Now() // 获取当前时间
start = &now
}
if dbConnection.tx != nil {
rows, err = dbConnection.tx.QueryContext(ctx, *query, *args...)
} else {
rows, err = dbConnection.db.QueryContext(ctx, *query, *args...)
}
if slowSQLMillis > 0 {
slow := time.Since(*start).Milliseconds()
if slow-int64(slowSQLMillis) >= 0 {
FuncPrintSQL(ctx, *query, *args, slow)
}
}
if err != nil {
err = fmt.Errorf("->queryContext执行错误:%w,-->zormErrorExecSQL:%s,-->zormErrorSQLValues:%s", err, *query, sqlErrorValues2String(*args))
}
return rows, err
}
/*
// prepareContext 预执行,如果已经开启事务,就以事务方式执行,如果没有开启事务,就以非事务方式执行
// prepareContext Pre-execution,If the transaction has been opened,it will be executed in transaction mode,if the transaction is not opened,it will be executed in non-transactional mode
func (dbConnection *dataBaseConnection) prepareContext(ctx context.Context, query *string) (*sql.Stmt, error) {
//打印SQL
//print SQL
if dbConnection.config.PrintSQL {
//logger.Info("printSQL", logger.String("sql", query))
FuncPrintSQL(ctx,*query, nil)
}
if dbConnection.tx != nil {
return dbConnection.tx.PrepareContext(ctx, *query)
}
return dbConnection.db.PrepareContext(ctx, *query)
}
*/