7 Star 24 Fork 16

macroan/traderStock-gui

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
db.py 5.23 KB
一键复制 编辑 原始数据 按行查看 历史
macroan 提交于 2018-03-15 14:07 . no commit message
#coding=utf-8
import MySQLdb
import sqlalchemy
import pandas as pd
from sqlalchemy import create_engine
from MySQLdb.cursors import DictCursor
from DBUtils.PooledDB import PooledDB
import warnings
warnings.filterwarnings(action="ignore", category=MySQLdb.Warning)
warnings.simplefilter(action='ignore', category=sqlalchemy.exc.SAWarning)
class DB:
"""
MYSQL数据库对象,负责产生数据库连接 , 此类中的连接采用连接池实现获取连接对象:conn = Mysql.getConn()
释放连接对象;conn.close()或del conn
"""
#连接池对象
__pool = None
__instance = None
def __init__(self):
self.conn = self.connect()
def __del__(self):
self.conn.close()
@classmethod
def getInstance(cls):
if(cls.__instance == None):
cls.__instance = DB()
return cls.__instance
@staticmethod
def __getConn():
"""
@summary: 静态方法,从连接池中取出连接
@return MySQLdb.connection
"""
try:
if DB.__pool is None:
__pool = PooledDB(creator=MySQLdb, mincached=1 , maxcached=20 ,
host='127.0.0.1' , user='root' , passwd='123456' ,
db='stock',use_unicode=False,charset='utf8',cursorclass=DictCursor)
return __pool.connection()
except Exception,e:
print("db connect Exception", e)
def connect(self):
if not self.is_exist_db('stock'):
self.createdb('stock')
try:
conn = MySQLdb.connect(host='127.0.0.1', port=3306, user='root', passwd='123456', db ='stock')
return conn
except Exception,e:
print("db connect Exception", e)
def getConnect(self):
return self.conn
def reset_conn(self):
self.conn = self.connect()
def excute(self, sql, params):
try:
cur = self.conn.cursor()
cur.execute(sql, params)
except Exception, e:
print("excute sql error", e)
cur.close()
def get_df(self, tbname):
df = None
if self.is_exist_tb(tbname):
sql = "select * from %s;"%tbname
df = pd.read_sql(sql, self.conn)
df.sort_values('date', ascending=False, inplace=True)
return df
def is_db_empty(self):
sql = "show tables;"
cur = self.conn.cursor()
cur.execute(sql)
r = cur.fetchall()
if len(r) <= 1:
cur.close()
return True
cur.close()
return False
def is_exist_db(self, dbname):
conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='123456',port=3306)
sql = "select SCHEMA_NAME from information_schema.SCHEMATA where SCHEMA_NAME=%s;"
cur = conn.cursor()
cur.execute(sql, (dbname,))
r = cur.fetchone()
if r != None:
cur.close()
return True
cur.close()
return False
def is_exist_tb(self, tbname):
sql = "select table_name FROM information_schema.TABLES WHERE table_name=%s;"
cur = self.conn.cursor()
cur.execute(sql, (tbname,))
r = cur.fetchone()
if r != None:
cur.close()
return True
cur.close()
return False
def createdb(self, dbname):
try:
conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='123456',port=3306)
cur = conn.cursor()
sql = 'create database if not exists %s default CHARSET utf8 COLLATE utf8_general_ci;'%dbname
cur.execute(sql)
conn.commit()
cur.close()
except MySQLdb.Error,e:
print("Mysql Error %d: %s" % (e.args[0], e.args[1]))
def drop_tb(self, tbname):
cur = self.conn.cursor()
sql = "DROP TABLE IF EXISTS %s;"%tbname
cur.execute(sql)
cur.close()
self.conn.commit()
def to_sql(self, tbname, df, needindex=False):
if df is None:
return
engine = create_engine('mysql+mysqldb://root:[email protected]:3306/stock?charset=utf8')#用sqlalchemy创建引擎
#engine = create_engine('mysql://root:[email protected]/stock?charset=utf8')
#存入数据库
df.to_sql(tbname, engine, if_exists='replace', index=needindex,
dtype={df.index.names[0]:sqlalchemy.types.NVARCHAR(length=255)})
def commit(self):
self.conn.commit()
def close(self):
self.conn.close()
def get_basics(self, type=1):
tbname = 'basics' if type == 1 else 'us_basics'
sql = 'select * from '+tbname+';'
try:
basics = pd.read_sql(sql, self.conn, index_col='code')
except Exception, e:
print(e.message)
return None
else:
return basics
def find_delete_fristrecord(self, tbname):
cur = self.conn.cursor()
cur.callproc('delete_first_record', (tbname))
cur.close()
def find_delete_lastrecord(self, tbname, count):
cur = self.conn.cursor()
#cur.execute('call delete_last_record(%s, %d)', (str(tbname), count))
cur.callproc('delete_last_record', (tbname, count))
cur.close()
def delete_sockdata_by_date(self, tbname, date):
cur = self.conn.cursor()
try:
cur.callproc('delete_table_date', (tbname, date))
except Exception, e:
print(e)
cur.close()
#tabletype 1:普通数据,2:含有kdj, macd数据
def insert_data(self, tbname, datas):
sql1 = 'insert into %s '%tbname
sql2 = 'values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
sql = sql1 + sql2
cur = self.conn.cursor()
cur.executemany(sql, datas)
self.commit()
cur.close()
def delete_table(self):
cur = self.conn.cursor()
if self.is_exist_tb():
cur.callproc('delete_table()')
cur.close()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/macroan/traderStock-gui.git
[email protected]:macroan/traderStock-gui.git
macroan
traderStock-gui
traderStock-gui
master

搜索帮助