1 Star 0 Fork 0

岚烟/spider

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
common_spider.py 6.52 KB
一键复制 编辑 原始数据 按行查看 历史
岚烟 提交于 2024-03-12 17:53 . 1
import os
import random
import time
import csv
import pymysql
from PIL import Image
from io import BytesIO
import excel
import requests
from fake_useragent import UserAgent
from bs4 import BeautifulSoup
import configparser
from file_type import FileType
api_url = "http://v2.api.juliangip.com/dynamic/getips?auto_white=1&num=1&pt=1&result_type=text&split=1&trade_no=1826124793841176&sign=a6a3093e79d0c7753b06ce2c512512f5"
# 创建 ConfigParser 对象
config = configparser.ConfigParser()
agent: UserAgent().random
class CommonSpider(object):
def __init__(self, link=None, file=None, save_type=None): # 定义常用变量
self.link = link
self.file = file
self.save_type = save_type
# IP代理
# http://demo.spiderpy.cn/get/ requests.get(api_url).json().get("proxy")
def get_proxy(self):
return requests.get(api_url).text
def get_html(self, link, prox=True): # 获取响应内容函数,使用随机User-Agent
ua = UserAgent()
headers = {
"User-Agent": ua.random # ua.random随机变换headers
}
cookie = config.get("web", "Cookie")
if cookie:
headers = {
"User-Agent": agent, # 注意,useragent不能改变,否则cookie失效
"Cookie":cookie
}
content = ''
if prox:
count = 0
while True:
count = count + 1
try:
proxy = self.get_proxy()
print("代理IP: %s" % proxy)
content = requests.get(link, proxies={"http": "http://{}".format(proxy)}, headers=headers)
break
except Exception as e:
print(e)
if count > 0:
exit()
else:
content = requests.get(link, headers=headers)
return content
def parse_html(self, link, rank): # 使用正则表达式来解析页面,提取数据
content = self.get_html(link).text
soup = BeautifulSoup(content, "html.parser")
# 获取所有的链接
attrs = {"class": "info"}
title_list = soup.findAll('div', attrs)
num = rank
list = []
for titles in title_list:
link = titles.find('a')
attrs = {"class": "rating_num"}
score = titles.find('span', attrs).string
attrs = {"class": "title"}
title = titles.find('span', attrs)
item = str(num) + "." + title.getText() + ":" + link['href'] + "---" + score + "分"
print(item)
list.append(item)
num = num + 1
self.save_file(list)
def clear_file(self):
with open(self.file, 'w') as file:
file.truncate(0)
# 以二进制的方式批量下载图片
def save_img(self, image_urls):
# 读取配置文件
config.read('db_config.ini')
# 获取配置项的值
path = config.get('file', 'ImgDownloadPath')
if len(path) == 0:
# 创建目录,用于保存图片
directory = os.getcwd() + "\\ImageDownload\\"
else:
directory = path
# 如果目录不存在则创建,此方法常用
if not os.path.exists(directory):
os.makedirs(directory)
# 遍历图片链接列表
for idx, url in enumerate(image_urls):
content = self.get_html(url).content
# 使用 PIL 打开图片数据
image = Image.open(BytesIO(content))
file_name = f'image_{idx}.jpg' # 使用索引作为文件名
image.save(directory + file_name)
print(f"图片 {url} 保存为 {file_name} 成功!")
def save_file(self, data):
with open(self.file, 'w', newline="") as file:
print("===")
type = self.save_type
if type == type == FileType.XLSX:
excel.write_excel_xlsx_append(self.file, data)
else:
# 操作文件对象时,需要添加newline参数逐行写入,否则会出现空行现象 添加newline=‘’ 取消换行
with open(self.file, 'a', newline="") as file:
print(type)
if type == FileType.CSV:
# delimiter 指定分隔符,默认为逗号,这里指定为空格
# quotechar 表示引用符
# writerow 单行写入,列表格式传入数据
spamwriter = csv.writer(file, delimiter=' ', quotechar='|')
spamwriter.writerows(data)
elif type == FileType.TXT:
file.write('\n'.join(data))
# def write_html(self): # 将提取的数据按要求保存,csv、MySQL数据库等
# 存入mysql
def save_mysql(self):
# 读取配置文件
config.read('db_config.ini')
# 获取配置项的值
host = config.get('db', 'host')
user = config.get('db', 'user')
database = config.get('db', 'database')
password = config.get('db', 'password')
db = pymysql.connect(host=host, user=user, password=password, database=database)
cursor = db.cursor()
# sql语句执性,列表元组
info_list = [('我不是药神', '徐峥', '2018-07-05'), ('你好,李焕英', '贾玲', '2021-02-12')]
sql = 'insert into filmtab values(%s,%s,%s)'
# 列表传参
cursor.executemany(sql, info_list)
db.commit()
# 关闭
cursor.close()
db.close()
# 列子
# L = []
# sql = 'insert into movieinfo values(%s,%s,%s)'
# # 整理数据
# for r in r_list:
# t = (
# r[0].strip(),
# r[1].strip()[3:],
# r[2].strip()[5:15]
# )
# L.append(t)
# print(L)
# # 一次性插入多条数据 L:[(),(),()]
# try:
# self.cursor.executemany(sql,L)
# # 将数据提交数据库
# self.db.commit()
# except:
# # 发生错误则回滚
# self.db.rollback()
def run(self):
scrap_time = 0
for start in range(0, 250, 25):
scrap_time += 1
s = self.link + '?start=' + str(start)
print(s)
self.parse_html(s, start + 1)
sleep_time = random.randint(1, 3) + random.random() # random.randint(0,2)结果是0、1、2,random.random()是0-1的随机数
print("开始休息:", sleep_time, "秒")
time.sleep(sleep_time)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/lanyanshy/spider.git
[email protected]:lanyanshy/spider.git
lanyanshy
spider
spider
master

搜索帮助