1 Star 1 Fork 0

kiligyss/flask_ftp

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
main.py 13.45 KB
一键复制 编辑 原始数据 按行查看 历史
kiligyss 提交于 2022-06-19 22:03 . 初始化
'''
Autor: Ruohong Yu
Date: 2022-04-19 10:22:40
LastEditors: Ruohong Yu
LastEditTime: 2022-04-27 22:19:15
Description: file content
FilePath: \merword:\program\p\pythonProject\FTP_Project\main.py
'''
# -*- coding: utf-8 -*-
import os
import sys
import re
import threading
import time
import shutil
from abc import ABCMeta, abstractmethod
from io import BytesIO
import function as fun
import xlwt
from flask import Flask, render_template, send_from_directory, flash, request, redirect, session, abort, make_response, send_file
from flask_apscheduler import APScheduler
from unicodedata import normalize
import sqlite3 as sql
app = Flask(__name__)
# app.config.from_object(__name__)
app.config['SECRET_KEY'] = 'YRH_SECRET_KEY'
#配置定时器
#单例模式
class Config(object):
SCHEDULER_API_ENABLED = True
scheduler = APScheduler()
rootpath = r'D:\program\p\pythonProject\FTP_Project\upload'
@app.route('/')
def wel():
conn = sql.connect('database.db')
# state=1 新增同步目录
# state=2 完成同步目录
# state=3 删除同步目录
# state=4 登录
# conn.execute('drop table oper')
conn.execute('create table if not exists oper (name TEXT, time TEXT,state TEXT)')
conn.commit()
conn.close()
return render_template('login.html')
class load():
def jud(self, curpath):
if os.path.isfile(curpath):
format = os.path.splitext(curpath)[1]
if format in ['.png', '.jpg', '.jpeg', '.gif']:
with open(curpath, 'rb') as f:
res = app.make_response(f.read())
res.headers['Content-Type'] = 'image'
return res
if format in ['.mp4']:
with open(curpath, 'rb') as f:
res = app.make_response(f.read())
res.headers['Content-Type'] = 'video'
return res
if format in ['.mp3']:
with open(curpath, 'rb') as f:
res = app.make_response(f.read())
res.headers['Content-Type'] = 'audio'
return res
# txt需要在界面ctrl+s手动下载
if format in ['.txt', '.xml', '.py', '.html']:
with open(curpath, 'rb') as f:
res = app.make_response(f.read())
res.headers['Content-Type'] = 'text/plain'
return res
if format in ['.pdf']:
with open(curpath, 'rb') as f:
res = app.make_response(f.read())
res.headers['Content-Type'] = 'application/pdf'
return res
return send_from_directory(os.path.split(curpath)[0],
os.path.split(curpath)[1],
as_attachment=True)
else:
return "notfile"
class op():
def jud(self, curpath, onepath):
filelist, dirlist = fun.dir_list(curpath)
if onepath:
path = onepath + '/'
else:
path = onepath
return render_template('home.html',
fileList=filelist,
dirList=dirlist,
path=path)
#工厂模式
class download_open(object):
def go( oper):
#下载文件
if oper == "download":
return load()
#打开文件夹
elif oper == "open":
return op()
@app.route('/home')
@app.route('/home/<path:path>',methods=['GET','POST'])
def home(path = ''):
onepath = path
curpath = os.path.join(rootpath, onepath)
if path == '':
session['pathHistory'] = ['']
#存储当前路径
session['currentPath'] = onepath
#存储历史路径
session['pathHistory'].append(onepath)
session['p'] = len(session['pathHistory']) - 1
yu = download_open.go('download').jud(curpath)
if yu == "notfile":
yu = download_open.go('open').jud(curpath,onepath)
return yu
@app.route('/up')
def up():
conn = sql.connect('database.db')
#conn.execute('drop table path')
conn.execute('create table if not exists path (name TEXT primary key, state TEXT,uptime TEXT,retime TEXT,ftpname TEXT)')
conn.commit()
pathlist = []
res = conn.execute('select * from path')
data = res.fetchall()
for i in range(len(data)):
pathlist.append({"name":data[i][0],"state":data[i][1],"uptime":data[i][2],"retime":data[i][3],"ftpname":data[i][4]})
conn.close()
return render_template('up.html',pathlist = pathlist)
#建造者模式
class Builder(metaclass=ABCMeta):
# 接口类,制定规范,只要此类必须要设置以下的被abstractmethod装饰的这些方法。
@abstractmethod
def go(self):
pass
#建造者模式
class up_oper(Builder):
def go(self):
conn = sql.connect('database.db')
srcpath = request.form['path']
now = int(time.time())
timeStruct = time.localtime(now)
nowtime_real = time.strftime('%Y-%m-%d %H:%M:%S', timeStruct)
conn.execute('insert into oper (name,time,state) values (?,?,?)', (srcpath, nowtime_real, '1'))
conn.commit()
conn.close()
#建造者模式
class up_path(Builder):
def go(self):
conn = sql.connect('database.db')
srcpath = request.form['path']
now = int(time.time())
timeStruct = time.localtime(now)
nowtime_real = time.strftime('%Y-%m-%d %H:%M:%S', timeStruct)
nowtime_noreal = time.strftime('%Y-%m-%d_%H-%M-%S', timeStruct)
conn.execute('insert into path (name,state,uptime,retime,ftpname) values (?,?,?,?,?)',
(srcpath, '已同步', nowtime_real, nowtime_real, nowtime_noreal))
conn.commit()
conn.close()
#建造者模式
class Director():
def __init__(self, my):
self.my = my
def update_my_sql(self):
self.my.go()
#建造者模式
@app.route('/upload',methods=['GET','POST'])
def upload():
p = up_path()
o = up_oper()
pp = Director(p)
oo = Director(o)
pp.update_my_sql()
oo.update_my_sql()
srcpath = request.form['path']
now = int(time.time())
timeStruct = time.localtime(now)
nowtime_noreal = time.strftime('%Y-%m-%d_%H-%M-%S', timeStruct)
dstpath = os.path.join(rootpath, nowtime_noreal)
shutil.copytree(srcpath, dstpath)
return {"code":200}
@app.route('/delpath',methods=['GET','POST'])
def delpath():
retime = request.form['time']
conn = sql.connect('database.db')
res = conn.execute('select * from path')
data = res.fetchall()
pathname =''
for i in range(len(data)):
if data[i][2] == retime:
pathname = data[i][0]
break
now = int(time.time())
timeStruct = time.localtime(now)
nowtime_real = time.strftime('%Y-%m-%d %H:%M:%S', timeStruct)
conn.execute('insert into oper (name,time,state) values (?,?,?)', (pathname, nowtime_real, '3'))
# 删除语句参数后必须要带逗号,踩坑
conn.execute('delete from path where uptime = ?', (retime,))
conn.commit()
conn.close()
return {"code":200}
@app.route('/log')
def log():
conn = sql.connect('database.db')
res = conn.execute(' select * from oper')
data = res.fetchall()
operlist = []
for i in range(len(data)):
intro = ''
if data[i][2] == '1':
intro = '新增同步目录'
elif data[i][2] == '2':
intro = '完成同步目录'
elif data[i][2] == '3':
intro = '删除同步目录'
elif data[i][2] == '4':
intro = '用户登录成功'
operlist.append({"name":data[i][0],"time":data[i][1],"state":intro})
conn.close()
return render_template('log.html',operlist = operlist)
@app.route('/lastpath',methods=['GET','POST'])
def lastpath():
his = session['pathHistory']
session['pathHistory'].pop()
session['p'] = session['p'] - 1
p = session['p']
currentPath = session['currentPath']
upPath = currentPath.split('/')
upPath.pop(-1)
if len(upPath)!=0:
upPath = '/'.join(upPath)
upPath = '/'+upPath
return {"code":200,"data":upPath}
@app.route('/del',methods=['GET','POST'])
def dele():
res = dict(request.form)
res.pop('del')
currentPath = session['currentPath']
acurrentPath = os.path.join(rootpath, currentPath)
for filename in res:
path = os.path.join(acurrentPath, filename)
try:
if os.path.isfile(path):
os.remove(path)
else:
shutil.rmtree(path)
print("删除%s成功" % path)
if currentPath!='':
currentPath = '/'+currentPath
return redirect('/home%s' % currentPath)
except:
print("%s 删除失败,权限不足或者此文件不存在" % path)
#装饰器模式
@scheduler.task('interval', id='do_renew', seconds=15, misfire_grace_time=900)
def renew():
conn = sql.connect('database.db')
conn.commit()
pathlist = []
res = conn.execute('select * from path')
data = res.fetchall()
print("*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*")
now = int(time.time())
timeStruct = time.localtime(now)
nowtime_real = time.strftime('%Y-%m-%d %H:%M:%S', timeStruct)
print(nowtime_real + "更新:")
for i in range(len(data)):
ft = data[i][4]
ftppath = os.path.join(rootpath,ft)
#如果误删了同步目录下的文件夹会自动生成一个新的文件夹
if os.path.exists(ftppath):
shutil.rmtree(ftppath)
shutil.copytree(data[i][0], ftppath)
now = int(time.time())
timeStruct = time.localtime(now)
nowtime_real = time.strftime('%Y-%m-%d %H:%M:%S', timeStruct)
print("【"+str(i)+"】from:")
print(" "+data[i][0])
print("to:")
print(" "+ftppath)
print('')
conn.execute('insert into oper (name,time,state) values (?,?,?)', (data[i][0], nowtime_real, '2'))
conn.execute('update path set retime = ? where name = ? ',(nowtime_real,data[i][0]))
conn.commit()
conn.close()
@app.route('/register', methods=["POST", "GET"])
def register():
conn = sql.connect('database.db')
print("Opened database successfully")
# conn.execute('drop table user')
conn.execute('create table if not exists user (name TEXT primary key, password TEXT)')
conn.commit()
print("Table created successfully")
username = request.form['username']
password = request.form['password']
if request.method == 'POST':
try:
nm = request.form['username']
psw = request.form['password']
conn.execute("INSERT INTO user (name,password) VALUES (?,?)", (nm, psw))
conn.commit()
return {"code": 200, "msg": "成功"}
except:
conn.rollback()
return {"code": 400, "msg": "注册失败"}
finally:
conn.close()
@app.route('/login', methods=["POST", "GET"])
def login():
conn = sql.connect('database.db')
print("Opened database successfully")
username = request.form['username']
password = request.form['password']
if request.method == 'POST':
try:
res = conn.execute('select * from user')
userlist = res.fetchall()
print(userlist)
fla = 0
for i in range(0, len(userlist)):
if username == userlist[i][0] and password == userlist[i][1]:
fla = 1
now = int(time.time())
timeStruct = time.localtime(now)
nowtime_real = time.strftime('%Y-%m-%d %H:%M:%S', timeStruct)
conn.execute('insert into oper (name,time,state) values (?,?,?)', (username, nowtime_real, '4'))
conn.commit()
return {"code": 200, "msg": "成功"}
if fla == 0:
return {"code": 400, "msg": "账户名或密码错误"}
except:
return {"code": 400, "msg": "账户名或密码错误"}
finally:
conn.close()
#不知道为什么,只能用wps打开
@app.route('/explog', methods=["POST", "GET"])
def listExcel():
conn = sql.connect('database.db')
cur = conn.cursor() # 获取游标
cur.execute('select * from oper') # 游标查询
data1 = cur.fetchall()
print(data1)
fields = ['名称', '时间', '操作'] # 设置自己需要的Excel表头
book = xlwt.Workbook(encoding='utf-8') # 获取excel对象
sheet = book.add_sheet('日志') # 设置excel的sheet名称
for col, field in enumerate(fields): # 写入excel表头
sheet.write(0, col, field)
row = 1
for data in data1: # 根据数据写入excel,col-单元格行标,field-单元格列标
for col, field in enumerate(data):
sheet.write(row, col, field)
row += 1
# 使用BytesIO将数据转为字节串在发送给浏览器
sio = BytesIO()
book.save(sio) # 将数据存储为bytes
sio.seek(0)
# response = make_response(sio.getvalue())
# response.headers['Content-type'] = 'application/vnd.ms-excel' # 响应头告诉浏览器发送的文件类型为excel
# response.headers['Content-Disposition'] = 'attachment; filename=data.xlsx' # 浏览器打开/保存的对话框,data.xlsx-设定的文件名
return send_file(sio,attachment_filename="testing.xlsx",as_attachment=True)
if __name__ == '__main__':
app.config.from_object(Config())
scheduler.init_app(app)
scheduler.start()
app.run(host='0.0.0.0', port=8080, debug=False)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/kiligyss/flask_ftp.git
[email protected]:kiligyss/flask_ftp.git
kiligyss
flask_ftp
flask_ftp
master

搜索帮助