代码拉取完成,页面将自动刷新
#coding=utf-8
import MySQLdb
import sqlalchemy
import pandas as pd
from sqlalchemy import create_engine
from MySQLdb.cursors import DictCursor
from DBUtils.PooledDB import PooledDB
import warnings
warnings.filterwarnings(action="ignore", category=MySQLdb.Warning)
warnings.simplefilter(action='ignore', category=sqlalchemy.exc.SAWarning)
class DB:
"""
MYSQL数据库对象,负责产生数据库连接 , 此类中的连接采用连接池实现获取连接对象:conn = Mysql.getConn()
释放连接对象;conn.close()或del conn
"""
#连接池对象
__pool = None
__instance = None
def __init__(self):
self.conn = self.connect()
def __del__(self):
self.conn.close()
@classmethod
def getInstance(cls):
if(cls.__instance == None):
cls.__instance = DB()
return cls.__instance
@staticmethod
def __getConn():
"""
@summary: 静态方法,从连接池中取出连接
@return MySQLdb.connection
"""
try:
if DB.__pool is None:
__pool = PooledDB(creator=MySQLdb, mincached=1 , maxcached=20 ,
host='127.0.0.1' , user='root' , passwd='123456' ,
db='stock',use_unicode=False,charset='utf8',cursorclass=DictCursor)
return __pool.connection()
except Exception,e:
print("db connect Exception", e)
def connect(self):
if not self.is_exist_db('stock'):
self.createdb('stock')
try:
conn = MySQLdb.connect(host='127.0.0.1', port=3306, user='root', passwd='123456', db ='stock')
return conn
except Exception,e:
print("db connect Exception", e)
def getConnect(self):
return self.conn
def reset_conn(self):
self.conn = self.connect()
def excute(self, sql, params):
try:
cur = self.conn.cursor()
cur.execute(sql, params)
except Exception, e:
print("excute sql error", e)
cur.close()
def get_df(self, tbname):
df = None
if self.is_exist_tb(tbname):
sql = "select * from %s;"%tbname
df = pd.read_sql(sql, self.conn)
df.sort_values('date', ascending=False, inplace=True)
return df
def is_db_empty(self):
sql = "show tables;"
cur = self.conn.cursor()
cur.execute(sql)
r = cur.fetchall()
if len(r) <= 1:
cur.close()
return True
cur.close()
return False
def is_exist_db(self, dbname):
conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='123456',port=3306)
sql = "select SCHEMA_NAME from information_schema.SCHEMATA where SCHEMA_NAME=%s;"
cur = conn.cursor()
cur.execute(sql, (dbname,))
r = cur.fetchone()
if r != None:
cur.close()
return True
cur.close()
return False
def is_exist_tb(self, tbname):
sql = "select table_name FROM information_schema.TABLES WHERE table_name=%s;"
cur = self.conn.cursor()
cur.execute(sql, (tbname,))
r = cur.fetchone()
if r != None:
cur.close()
return True
cur.close()
return False
def createdb(self, dbname):
try:
conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='123456',port=3306)
cur = conn.cursor()
sql = 'create database if not exists %s default CHARSET utf8 COLLATE utf8_general_ci;'%dbname
cur.execute(sql)
conn.commit()
cur.close()
except MySQLdb.Error,e:
print("Mysql Error %d: %s" % (e.args[0], e.args[1]))
def drop_tb(self, tbname):
cur = self.conn.cursor()
sql = "DROP TABLE IF EXISTS %s;"%tbname
cur.execute(sql)
cur.close()
self.conn.commit()
def to_sql(self, tbname, df, needindex=False):
if df is None:
return
engine = create_engine('mysql+mysqldb://root:[email protected]:3306/stock?charset=utf8')#用sqlalchemy创建引擎
#engine = create_engine('mysql://root:[email protected]/stock?charset=utf8')
#存入数据库
df.to_sql(tbname, engine, if_exists='replace', index=needindex,
dtype={df.index.names[0]:sqlalchemy.types.NVARCHAR(length=255)})
def commit(self):
self.conn.commit()
def close(self):
self.conn.close()
def get_basics(self, type=1):
tbname = 'basics' if type == 1 else 'us_basics'
sql = 'select * from '+tbname+';'
try:
basics = pd.read_sql(sql, self.conn, index_col='code')
except Exception, e:
print(e.message)
return None
else:
return basics
def find_delete_fristrecord(self, tbname):
cur = self.conn.cursor()
cur.callproc('delete_first_record', (tbname))
cur.close()
def find_delete_lastrecord(self, tbname, count):
cur = self.conn.cursor()
#cur.execute('call delete_last_record(%s, %d)', (str(tbname), count))
cur.callproc('delete_last_record', (tbname, count))
cur.close()
def delete_sockdata_by_date(self, tbname, date):
cur = self.conn.cursor()
try:
cur.callproc('delete_table_date', (tbname, date))
except Exception, e:
print(e)
cur.close()
#tabletype 1:普通数据,2:含有kdj, macd数据
def insert_data(self, tbname, datas):
sql1 = 'insert into %s '%tbname
sql2 = 'values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
sql = sql1 + sql2
cur = self.conn.cursor()
cur.executemany(sql, datas)
self.commit()
cur.close()
def delete_table(self):
cur = self.conn.cursor()
if self.is_exist_tb():
cur.callproc('delete_table()')
cur.close()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。