-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathora_sv.py
158 lines (128 loc) · 4.29 KB
/
ora_sv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# -*-coding:utf-8 -*
"""
Oracle DB services
"""
from __future__ import absolute_import
### import io
### import sys
### sys.stdout = io.TextIOWrapper(sys.stdout.buffer,encoding='utf8')
### from __future__ import unicode_literals
_version = "0.2.2"
_author = "mabotech"
import time
import traceback
from logging import getLogger
from nameko.timer import timer
from nameko.dependency_providers import Config
from nameko.rpc import rpc, RpcProxy
from nameko.constants import DEFAULT_HEARTBEAT
from nameko_oracle import Oracle
from constants import ORA_TYPE_MAP
log = getLogger("ora_sv")
class OracleService:
""" oracle service
"""
name = "oracle_service"
config = Config()
ora = Oracle()
@timer(interval=DEFAULT_HEARTBEAT)
def heartbeat(self):
"""
"""
now = time.strftime("%Y-%m-%d %H:%M:%S")
timestamp = 1000 * time.time()
log.debug("heartbeat: {}".format(now))
def _prep_params(self, sp_name, params):
"""
"""
sp_params = []
sp_params_names = []
i = 0 ## index of input parameter
oi = 0 ## index of output parameter
for item in params:
# item = param.split(":")[-1]
i = i + 1
if isinstance(item, str):
xparam = item.split(":") ## pname:<STRING>
if xparam[-1] in ORA_TYPE_MAP:
oi = oi + 1
sp_params.append(
self.ora.cursor.var(ORA_TYPE_MAP[xparam[-1]]))
if len(xparam) > 1:
sp_params_names.append(xparam[0])
else:
sp_params_names.append("X{}".format(oi))
else:
sp_params.append(item)
sp_params_names.append("P{}".format(i))
else:
sp_params.append(item)
sp_params_names.append("P{}".format(i))
return sp_params, sp_params_names
def _prep_result(self, result, params: list, sp_params_names: list,
is_dict: bool):
"""
"""
i = 0
data = []
data_d = {}
for item in params:
if is_dict is True: ## return dict
name = sp_params_names[i]
if item == "<CURSOR>":
v = self.ora.get_ref_rows(result[i])
data_d[name] = v
else:
# v = self.ora.get_rows(result[i])
### data.append(result[i])
data_d[name] = result[i]
else: ## return list
if item == "<CURSOR>":
v = self.ora.get_ref_rows(result[i])
data.append(v)
else:
# v = self.ora.get_rows(result[i])
data.append(result[i])
i = i + 1
return data, data_d
@rpc
def query(self, sql: str, args={}):
"""
TODO:add schema to sql?
"""
try:
self.ora.execute(sql, args)
return self.ora.get_rows()
except Exception as ex:
log.error(sql)
log.error(traceback.format_exc())
raise Exception(repr(ex))
@rpc
def call_proc(self, sp_name: str, params: list, is_dict=True):
"""
sp_name: stored procudure
params: in, out(number, string, datetime, timestamp, cursor...)
"""
schema = self.config.get("ORACLE")["username"]
sp_name_a = sp_name.split(".")
if sp_name_a[0].lower() == schema.lower():
pass
else:
## -> schema.sp_name
sp_name = ".".join([schema, sp_name])
log.debug(sp_name)
log.debug(params)
sp_params, sp_params_names = self._prep_params(sp_name, params)
### log.debug(sp_params)
try:
result = self.ora.callproc(sp_name, sp_params)
data, data_d = self._prep_result(result, params, sp_params_names,
is_dict)
if is_dict is True:
return data_d
else:
return data
except Exception as ex:
log.debug(traceback.format_exc())
raise Exception(repr(ex))
return {"error": repr(ex)}