1 Star 0 Fork 1

jackfrued/hellopc

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
example06.py 3.61 KB
一键复制 编辑 原始数据 按行查看 历史
jackfrued 提交于 2020-06-23 10:22 . 使用aiohttp联网获取数据
"""
基于Redis的多线程爬虫
"""
import hashlib
import io
import json
from concurrent.futures.thread import ThreadPoolExecutor
from threading import local
from urllib.parse import urljoin, urlparse
import MySQLdb
import bs4
import redis
import requests
MAX_DEPTH = 2
seed_url = 'https://sports.sohu.com/'
netloc = urlparse(seed_url).netloc
def _force_bytes(data):
if type(data) != bytes:
if type(data) == str:
data = data.encode()
elif type(data) == io.BytesIO:
data = data.getvalue()
else:
data = bytes(data)
return data
def make_md5_digest(data):
"""生成MD5摘要"""
data = _force_bytes(data)
return hashlib.md5(data).hexdigest()
thread_local = local()
def get_redis_connection():
if not hasattr(thread_local, 'redis_cli'):
redis_cli = redis.Redis(host='47.104.31.138', port=54321, password='1Qaz2Wsx')
setattr(thread_local, 'redis_cli', redis_cli)
return getattr(thread_local, 'redis_cli')
def get_db_connection():
if not hasattr(thread_local, 'conn'):
conn = MySQLdb.connect(host='47.104.31.138', port=3306,
user='hellokitty', password='Hellokitty.618',
database='sohu', charset='utf8',
autocommit=True)
setattr(thread_local, 'conn', conn)
return getattr(thread_local, 'conn')
def write_to_db(url, content, digest):
try:
with get_db_connection().cursor() as cursor:
cursor.execute(
'insert into tb_page (url, content, digest) values (%s, %s, %s)',
(url, content, digest)
)
except MySQLdb.MySQLError as err:
print(err)
def fix_url(curr_url, href):
if href.startswith('//'):
href = f'http:{href}'
elif href.startswith('/'):
href = urljoin(curr_url, href)
elif href.startswith('mailto') or href.startswith('javascript')\
or href.startswith('#') or href.startswith('?'):
href = ''
href = href.replace('!\'\'}', '')
return urljoin('http://', href) if href else href
def fetch_page(curr_url, depth):
redis_cli = get_redis_connection()
try:
resp = requests.get(curr_url, timeout=1)
write_to_db(curr_url, resp.text, make_md5_digest(resp.text))
soup = bs4.BeautifulSoup(resp.text, 'lxml')
if depth < MAX_DEPTH:
all_anchors = soup.find_all('a')
for anchor in all_anchors:
href = anchor.attrs.get('href', '')
href = fix_url(curr_url, href)
if urlparse(href).netloc == netloc:
redis_cli.rpush(
'spider:sohu:new_urls',
json.dumps({'href': href, 'depth': depth + 1})
)
except Exception as ex:
print(ex)
def main():
redis_cli = get_redis_connection()
redis_cli.rpush(
'spider:sohu:new_urls',
json.dumps({'href': seed_url, 'depth': 0})
)
with ThreadPoolExecutor(max_workers=32) as t_pool:
future = None
while redis_cli.llen('spider:sohu:new_urls') > 0 \
or (future and not future.done()):
data = redis_cli.lpop('spider:sohu:new_urls')
if data:
url_info = json.loads(data)
curr_url, depth = url_info['href'], url_info['depth']
if not redis_cli.sismember('spider:sohu:visited_urls', curr_url):
redis_cli.sadd('spider:sohu:visited_urls', curr_url)
future = t_pool.submit(fetch_page, curr_url, depth)
if __name__ == '__main__':
main()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/jackfrued/hellopc.git
[email protected]:jackfrued/hellopc.git
jackfrued
hellopc
hellopc
master

搜索帮助