diff --git a/sql/engines/mongo.py b/sql/engines/mongo.py index 8686dd4470..fb01995d5b 100644 --- a/sql/engines/mongo.py +++ b/sql/engines/mongo.py @@ -292,9 +292,8 @@ def exec_cmd(self, sql, db_name=None, slave_ok=""): sql_len = len(sql) is_load = False # 默认不使用load方法执行mongodb sql语句 try: - if ( - not sql.startswith("var host=") and sql_len > 4000 - ): # 在master节点执行的情况,如果sql长度大于4000,就采取load js的方法 + if not sql.startswith("var host=") and sql_len > 4000: + # 在master节点执行的情况,如果sql长度大于4000,就采取load js的方法 # 因为用mongo load方法执行js脚本,所以需要重新改写一下sql,以便回显js执行结果 sql = "var result = " + sql + "\nprintjson(result);" # 因为要知道具体的临时文件位置,所以用了NamedTemporaryFile模块 @@ -303,77 +302,12 @@ def exec_cmd(self, sql, db_name=None, slave_ok=""): ) fp.write(sql.encode("utf-8")) fp.seek(0) # 把文件指针指向开始,这样写的sql内容才能落到磁盘文件上 - if self.user and self.password: - cmd = "{mongo} --quiet -u {uname} -p '{password}' {host}:{port}/{auth_db} <<\\EOF\ndb=db.getSiblingDB(\"{db_name}\");{slave_ok}load('{tempfile_}')\nEOF".format( - mongo=mongo, - uname=self.user, - password=self.password, - host=self.host, - port=self.port, - db_name=db_name, - sql=sql, - auth_db=auth_db, - slave_ok=slave_ok, - tempfile_=fp.name, - ) - else: - cmd = "{mongo} --quiet {host}:{port}/{auth_db} <<\\EOF\ndb=db.getSiblingDB(\"{db_name}\");{slave_ok}load('{tempfile_}')\nEOF".format( - mongo=mongo, - host=self.host, - port=self.port, - db_name=db_name, - sql=sql, - auth_db=auth_db, - slave_ok=slave_ok, - tempfile_=fp.name, - ) + cmd = self._build_cmd(db_name, auth_db, slave_ok, fp.name, is_load=True) is_load = True # 标记使用了load方法,用来在finally里面判断是否需要强制删除临时文件 - elif ( - not sql.startswith("var host=") and sql_len < 4000 - ): # 在master节点执行的情况, 如果sql长度小于4000,就直接用mongo shell执行,减少磁盘交换,节省性能 - if self.user and self.password: - cmd = "{mongo} --quiet -u {uname} -p '{password}' {host}:{port}/{auth_db} <<\\EOF\ndb=db.getSiblingDB(\"{db_name}\");{slave_ok}printjson({sql})\nEOF".format( - mongo=mongo, - uname=self.user, - password=self.password, - host=self.host, - port=self.port, - db_name=db_name, - sql=sql, - auth_db=auth_db, - slave_ok=slave_ok, - ) - else: - cmd = "{mongo} --quiet {host}:{port}/{auth_db} <<\\EOF\ndb=db.getSiblingDB(\"{db_name}\");{slave_ok}printjson({sql})\nEOF".format( - mongo=mongo, - host=self.host, - port=self.port, - db_name=db_name, - sql=sql, - auth_db=auth_db, - slave_ok=slave_ok, - ) + elif not sql.startswith("var host=") and sql_len < 4000: # 在master节点执行的情况, 如果sql长度小于4000,就直接用mongo shell执行,减少磁盘交换,节省性能 + cmd = self._build_cmd(db_name, auth_db, slave_ok, sql=sql) else: - if self.user and self.password: - cmd = "{mongo} --quiet -u {user} -p '{password}' {host}:{port}/{auth_db} <<\\EOF\nrs.slaveOk();{sql}\nEOF".format( - mongo=mongo, - user=self.user, - password=self.password, - host=self.host, - port=self.port, - db_name=db_name, - sql=sql, - auth_db=auth_db, - ) - else: - cmd = "{mongo} --quiet {host}:{port}/{auth_db} <<\\EOF\nrs.slaveOk();{sql}\nEOF".format( - mongo=mongo, - host=self.host, - port=self.port, - db_name=db_name, - sql=sql, - auth_db=auth_db, - ) + cmd = self._build_cmd(db_name, auth_db, sql=sql, slave_ok="rs.slaveOk();") p = subprocess.Popen( cmd, shell=True, @@ -403,7 +337,68 @@ def exec_cmd(self, sql, db_name=None, slave_ok=""): if is_load: fp.close() return msg - + # 用来进行判断是否有用户名与密码情况,进而返回要执行的mongo命令 + def _build_cmd(self, db_name, auth_db, slave_ok="", tempfile_=None, sql=None, is_load=False): + """构建命令行""" + if is_load: + if self.user and self.password: + return ( + "{mongo} --quiet -u {uname} -p '{password}' {host}:{port}/{auth_db} <<\\EOF\n" + "db=db.getSiblingDB(\"{db_name}\");{slave_ok}load('{tempfile_}')\nEOF" + ).format( + mongo=mongo, + uname=self.user, + password=self.password, + host=self.host, + port=self.port, + db_name=db_name, + auth_db=auth_db, + slave_ok=slave_ok, + tempfile_=tempfile_, + ) + else: + return ( + "{mongo} --quiet {host}:{port}/{auth_db} <<\\EOF\n" + "db=db.getSiblingDB(\"{db_name}\");{slave_ok}load('{tempfile_}')\nEOF" + ).format( + mongo=mongo, + host=self.host, + port=self.port, + db_name=db_name, + auth_db=auth_db, + slave_ok=slave_ok, + tempfile_=tempfile_, + ) + else: + if self.user and self.password: + return ( + "{mongo} --quiet -u {uname} -p '{password}' {host}:{port}/{auth_db} <<\\EOF\n" + "db=db.getSiblingDB(\"{db_name}\");{slave_ok}printjson({sql})\nEOF" + ).format( + mongo=mongo, + uname=self.user, + password=self.password, + host=self.host, + port=self.port, + db_name=db_name, + auth_db=auth_db, + slave_ok=slave_ok, + sql=sql, + ) + else: + return ( + "{mongo} --quiet {host}:{port}/{auth_db} <<\\EOF\n" + "db=db.getSiblingDB(\"{db_name}\");{slave_ok}printjson({sql})\nEOF" + ).format( + mongo=mongo, + host=self.host, + port=self.port, + db_name=db_name, + auth_db=auth_db, + slave_ok=slave_ok, + sql=sql, + ) + def get_master(self): """获得主节点的port和host"""