import pymysql; from DBUtils.PooledDB import PooledDB; pool = PooledDB(pymysql,20,host='localhost',user='root',passwd='root',db='crt',port=3306,charset="utf8"); import logApi logger = logApi.logger import os from ini_op import Config; import json import time import requests base_dir = os.path.dirname(os.path.abspath(__file__)) config = Config(base_dir+"/config.ini"); tokenApiUrl = config.get("baseconf", "tokenApiUrl"); class DbController(): def getToken(self): token = "" headers = {"Content-Type": "application/json", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"} param = { "grant_type": config.get("baseconf", "grant_type"), "client_id": config.get("baseconf", "client_id"), "client_secret": config.get("baseconf", "client_secret"), "tenant_id": config.get("baseconf", "tenant_id"), "timestamp": int(int(round(time.time() * 1000))), "nonce": config.get("baseconf", "nonce") } try: result = requests.post(tokenApiUrl, json=param, headers=headers, timeout=10) data = json.loads(result.text) errorCode = data["errcode"]; if errorCode == '0': token = data["data"]["access_token"]; except: logger.error("获取token失败 ") return token; def getBussinessNum(self,cerf_id): business_num = ""; try: self.conn = pool.connection(); self.cursor = self.conn.cursor() sql = "select business_num from querycustomer where cerf_id='"+cerf_id+"' order by biz_id desc"; res = self.cursor.execute(sql); data = self.cursor.fetchone(); if len(data)>0: business_num = data[0] except: logger.error(" getBussinessNum error") self.cursor.close(); self.conn.close(); return business_num;