Commit aea13852 authored by 时海鑫's avatar 时海鑫

test1

parent 3d357825
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
微信公众号文章爬虫工具
版本: 1.5
作者: CAN
功能: 爬取指定公众号的历史文章列表及内容
"""
import os
import re
import json
import time
import random
import argparse
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs
# 伪装头部列表 - 随机选择防止被封
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.4 Safari/605.1.15',
'Mozilla/5.0 (iPhone; CPU iPhone OS 16_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.4 Mobile/15E148 Safari/604.1',
'Mozilla/5.0 (Linux; Android 13; SM-S901U) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.5735.196 Mobile Safari/537.36'
]
class WeChatCrawler:
def __init__(self, biz_id=None, nickname=None, cookie=None, output_dir="output", delay=2):
"""
初始化爬虫
:param biz_id: 公众号biz ID (可选)
:param nickname: 公众号昵称 (可选)
:param cookie: 微信Cookie (必需)
:param output_dir: 输出目录
:param delay: 请求延迟(秒)
"""
self.session = requests.Session()
self.cookie = cookie
self.output_dir = output_dir
self.delay = delay
self.biz_id = biz_id
self.nickname = nickname
self.article_count = 0
self.failed_urls = []
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
os.makedirs(os.path.join(output_dir, "articles"), exist_ok=True)
# 设置会话Cookie
if cookie:
self.session.headers.update({'Cookie': cookie})
def get_random_headers(self):
"""生成随机请求头"""
return {
'User-Agent': random.choice(USER_AGENTS),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Referer': 'https://mp.weixin.qq.com/'
}
def search_public_account(self, nickname):
"""通过昵称搜索公众号获取biz_id"""
print(f"[*] 正在搜索公众号: {nickname}")
search_url = "https://mp.weixin.qq.com/cgi-bin/searchbiz"
params = {
'action': 'search_biz',
'lang': 'zh_CN',
'f': 'json',
'ajax': '1',
'random': random.random(),
'query': nickname,
'begin': '0',
'count': '5'
}
try:
headers = self.get_random_headers()
response = self.session.get(search_url, params=params, headers=headers)
response.raise_for_status()
data = response.json()
if 'list' not in data:
print(f"[-] 搜索失败,响应: {data}")
return None
accounts = data['list']
if not accounts:
print(f"[-] 未找到昵称为 '{nickname}' 的公众号")
return None
# 选择最匹配的结果
for acc in accounts:
if acc['nickname'].lower() == nickname.lower():
print(f"[+] 找到公众号: {acc['nickname']} (biz: {acc['fakeid']})")
return acc['fakeid']
# 返回第一个结果
first_acc = accounts[0]
print(f"[+] 找到相似公众号: {first_acc['nickname']} (biz: {first_acc['fakeid']})")
return first_acc['fakeid']
except Exception as e:
print(f"[-] 搜索失败: {str(e)}")
return None
def get_article_list(self, biz_id, count=10):
"""获取公众号文章列表"""
print(f"[*] 正在获取文章列表 (biz: {biz_id})")
article_list = []
offset = 0
while len(article_list) < count:
url = "https://mp.weixin.qq.com/cgi-bin/appmsg"
params = {
'action': 'list_ex',
'begin': str(offset),
'count': '5',
'fakeid': biz_id,
'type': '9',
'query': '',
'token': self.get_token(),
'lang': 'zh_CN',
'f': 'json',
'ajax': '1'
}
try:
headers = self.get_random_headers()
response = self.session.get(url, params=params, headers=headers)
response.raise_for_status()
data = response.json()
if 'app_msg_list' not in data:
print(f"[-] 获取文章列表失败,响应: {data}")
break
articles = data['app_msg_list']
if not articles:
print("[+] 已获取所有文章")
break
for article in articles:
if len(article_list) >= count:
break
article_list.append({
'title': article['title'],
'url': article['link'],
'publish_time': article['create_time'],
'cover': article['cover'],
'digest': article['digest']
})
offset += len(articles)
print(f"[+] 已获取 {len(article_list)}/{count} 篇文章")
time.sleep(self.delay + random.uniform(0, 1))
except Exception as e:
print(f"[-] 获取文章列表出错: {str(e)}")
break
return article_list
def get_token(self):
"""获取随机token值 (简化实现)"""
return str(int(time.time() * 1000))
def extract_real_url(self, url):
"""提取微信文章真实URL"""
try:
# 处理微信跳转URL
if 'mp.weixin.qq.com/s?' not in url:
return url
headers = self.get_random_headers()
response = requests.get(url, headers=headers, allow_redirects=False)
# 检查重定向
if 300 <= response.status_code < 400:
location = response.headers.get('Location')
if location:
return location
# 从HTML中提取真实URL
soup = BeautifulSoup(response.text, 'html.parser')
meta = soup.find('meta', attrs={'property': 'og:url'})
if meta and meta.get('content'):
return meta.get('content')
# 尝试从JS中提取
pattern = re.compile(r'var\s+msg_link\s*=\s*"([^"]+)"')
match = pattern.search(response.text)
if match:
return match.group(1)
return url
except Exception as e:
print(f"[-] 提取真实URL失败: {str(e)}")
return url
def parse_article(self, url):
"""解析文章内容"""
try:
real_url = self.extract_real_url(url)
print(f"[*] 解析文章: {real_url}")
headers = self.get_random_headers()
response = requests.get(real_url, headers=headers)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# 提取文章元信息
title = soup.find('h1', id='activity-name').get_text().strip() if soup.find('h1',
id='activity-name') else "无标题"
author = soup.find('span', id='js_name').get_text().strip() if soup.find('span', id='js_name') else "未知作者"
publish_time = soup.find('em', id='publish_time').get_text().strip() if soup.find('em',
id='publish_time') else ""
# 提取正文内容
content_div = soup.find('div', id='js_content')
if not content_div:
return None
# 清理无用元素
for elem in content_div.find_all(['script', 'style', 'iframe']):
elem.decompose()
# 处理图片 - 替换为Markdown格式
for img in content_div.find_all('img'):
if img.get('data-src'):
img.replace_with(f"\n![图片]({img['data-src']})\n")
elif img.get('src'):
img.replace_with(f"\n![图片]({img['src']})\n")
# 获取文本内容
content = content_div.get_text().strip()
# 获取阅读数和点赞数
read_num = soup.find('span', class_='read_num').get_text().strip() if soup.find('span',
class_='read_num') else "N/A"
like_num = soup.find('span', class_='like_num').get_text().strip() if soup.find('span',
class_='like_num') else "N/A"
return {
'title': title,
'author': author,
'publish_time': publish_time,
'url': real_url,
'content': content,
'read_count': read_num,
'like_count': like_num
}
except Exception as e:
print(f"[-] 解析文章失败: {str(e)}")
self.failed_urls.append(url)
return None
def save_article(self, article_data, index):
"""保存文章到文件"""
if not article_data:
return False
try:
# 清理文件名中的非法字符
clean_title = re.sub(r'[\\/*?:"<>|]', "", article_data['title'])
if not clean_title:
clean_title = f"无标题_{index}"
# 创建文件名
timestamp = int(time.time())
filename = f"{index:03d}_{clean_title[:50]}_{timestamp}.txt"
filepath = os.path.join(self.output_dir, "articles", filename)
# 写入文件
with open(filepath, 'w', encoding='utf-8') as f:
f.write(f"标题: {article_data['title']}\n")
f.write(f"作者: {article_data['author']}\n")
f.write(f"发布时间: {article_data['publish_time']}\n")
f.write(f"原文链接: {article_data['url']}\n")
f.write(f"阅读数: {article_data['read_count']}\n")
f.write(f"点赞数: {article_data['like_count']}\n")
f.write("\n===== 正文内容 =====\n\n")
f.write(article_data['content'])
self.article_count += 1
print(f"[+] 已保存: {filename}")
return True
except Exception as e:
print(f"[-] 保存文章失败: {str(e)}")
return False
def save_summary(self, article_list):
"""保存文章摘要信息"""
try:
summary_path = os.path.join(self.output_dir, "summary.json")
summary = {
'crawl_time': time.strftime("%Y-%m-%d %H:%M:%S"),
'total_articles': len(article_list),
'success_count': self.article_count,
'failed_count': len(self.failed_urls),
'failed_urls': self.failed_urls,
'articles': []
}
for article in article_list:
summary['articles'].append({
'title': article['title'],
'url': article['url'],
'publish_time': article['publish_time'],
'cover': article['cover'],
'digest': article['digest']
})
with open(summary_path, 'w', encoding='utf-8') as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
print(f"[+] 已保存摘要文件: summary.json")
return True
except Exception as e:
print(f"[-] 保存摘要失败: {str(e)}")
return False
def run(self, count=10):
"""运行爬虫"""
start_time = time.time()
# 获取公众号biz_id
if not self.biz_id and self.nickname:
self.biz_id = self.search_public_account(self.nickname)
if not self.biz_id:
print("[-] 无法获取公众号ID,请检查输入")
return False
# 获取文章列表
article_list = self.get_article_list(self.biz_id, count)
if not article_list:
print("[-] 未获取到文章列表")
return False
# 爬取并保存每篇文章
print(f"\n[*] 开始爬取 {len(article_list)} 篇文章...")
for idx, article in enumerate(article_list, 1):
article_data = self.parse_article(article['url'])
self.save_article(article_data, idx)
time.sleep(self.delay + random.uniform(0, 1.5))
# 保存摘要信息
self.save_summary(article_list)
# 输出统计信息
elapsed = time.time() - start_time
print(f"\n[+] 爬取完成! 共处理 {len(article_list)} 篇文章")
print(f" 成功: {self.article_count} | 失败: {len(self.failed_urls)}")
print(f" 耗时: {elapsed:.2f}秒 | 平均每篇: {elapsed / len(article_list):.2f}秒")
print(f" 输出目录: {os.path.abspath(self.output_dir)}")
return True
def main():
parser = argparse.ArgumentParser(
description="微信公众号文章爬虫工具",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
# 公众号识别参数(二选一)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-b', '--biz', help="公众号biz ID")
group.add_argument('-n', '-人民日报', help="人民日报")
cookies="""
pgv_pvid=5538374392; rewardsn=; wxtokenkey=777; ua_id=RAT5yp73W8mHBRRIAAAAAOpFZKoCL61u87BvwP5wQ_A=; _clck=1q51fdm|1|fx2|0; wxuin=50840363385860; mm_lang=zh_CN; uuid=bd9016bb4e68ca1f5bce8539b86c1338; rand_info=CAESIOq3nQXU2j9pb+27l9uGkpwVHw6b2O30QBu2guQ+HO6z; slave_bizuin=3267633814; data_bizuin=3267633814; bizuin=3267633814; data_ticket=Ap5z2aD7HasqbT4e3fvglx26n9/fBsgw1zXFnXDms0xWtmn6FvperM3v+K4wfYcw; slave_sid=U0hObUVTVDBsZnplSDVTVGNLMVRUek1SalR6SFdFMWtwc09MWHZrY0owdWNiTkdNcks4VURtMFZGTGtpS2VJcFNQS19wSmpHMHlObHhJMmNrRDRJcEdKRGRJYktUQUJzZUpBSEpBczdRUG4wY0JhcUtZeG0xNDN6aUt3ZkpRbUtOWHJvUDNvM0pIMjgwampM; slave_user=gh_2f299fe29fbb; xid=c246ba7a46018bc2728432e2d3256ef3; mmad_session=32985bde5f5df355dda20a2df644228ce3f786a64961c1269f68927d9fe35dcd8c398ad7fb7064e64acd864f6395a23417694dad10c6e11ad4e64714f8e54bf84821c102216974578ff76cd025577a64a694f83af365d2b7aeccd3f606806fa611de1c56c245721266e7088080fefde3; pgv_info=ssid=s4781745685; ts_last=mp.weixin.qq.com/cgi-bin/frame; ts_uid=1189556960; sig=h019b1a330cd71c9171aec44d0acf24ecd8a597223fce1b2ef4557a7e5e09cef123e10b4d35fa9bea79; _clsk=2fr0r0|1750842219998|13|1|mp.weixin.qq.com/weheat-agent/payload/record
"""
# 其他参数
parser.add_argument('-c',cookies, required=True, help=cookies)
parser.add_argument('-o', '--output', default="wechat_output", help="")
parser.add_argument('-d', '--delay', type=float, default=3.0,
help="请求延迟时间(秒)")
parser.add_argument('-a', '--amount', type=int, default=10,
help="爬取文章数量")
args = parser.parse_args()
print("=" * 60)
print(f"微信公众号文章爬虫 - 开始执行")
print("=" * 60)
# 初始化爬虫
crawler = WeChatCrawler(
biz_id=args.biz,
nickname=args.nickname,
cookie=args.cookie,
output_dir=args.output,
delay=args.delay
)
# 运行爬虫
crawler.run(count=args.amount)
if __name__ == "__main__":
main()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment