Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
T
test_webhook
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
时海鑫
test_webhook
Commits
14bb618e
Commit
14bb618e
authored
Oct 10, 2025
by
时海鑫
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'dev' into 'master'
test1 See merge request
!15
parents
46062ce5
aea13852
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
412 additions
and
0 deletions
+412
-0
WXGZH.py
WXGZH.py
+412
-0
No files found.
WXGZH.py
0 → 100644
View file @
14bb618e
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
微信公众号文章爬虫工具
版本: 1.5
作者: CAN
功能: 爬取指定公众号的历史文章列表及内容
"""
import
os
import
re
import
json
import
time
import
random
import
argparse
import
requests
from
bs4
import
BeautifulSoup
from
urllib.parse
import
urlparse
,
parse_qs
# 伪装头部列表 - 随机选择防止被封
USER_AGENTS
=
[
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36'
,
'Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.4 Safari/605.1.15'
,
'Mozilla/5.0 (iPhone; CPU iPhone OS 16_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.4 Mobile/15E148 Safari/604.1'
,
'Mozilla/5.0 (Linux; Android 13; SM-S901U) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.5735.196 Mobile Safari/537.36'
]
class
WeChatCrawler
:
def
__init__
(
self
,
biz_id
=
None
,
nickname
=
None
,
cookie
=
None
,
output_dir
=
"output"
,
delay
=
2
):
"""
初始化爬虫
:param biz_id: 公众号biz ID (可选)
:param nickname: 公众号昵称 (可选)
:param cookie: 微信Cookie (必需)
:param output_dir: 输出目录
:param delay: 请求延迟(秒)
"""
self
.
session
=
requests
.
Session
()
self
.
cookie
=
cookie
self
.
output_dir
=
output_dir
self
.
delay
=
delay
self
.
biz_id
=
biz_id
self
.
nickname
=
nickname
self
.
article_count
=
0
self
.
failed_urls
=
[]
# 创建输出目录
os
.
makedirs
(
output_dir
,
exist_ok
=
True
)
os
.
makedirs
(
os
.
path
.
join
(
output_dir
,
"articles"
),
exist_ok
=
True
)
# 设置会话Cookie
if
cookie
:
self
.
session
.
headers
.
update
({
'Cookie'
:
cookie
})
def
get_random_headers
(
self
):
"""生成随机请求头"""
return
{
'User-Agent'
:
random
.
choice
(
USER_AGENTS
),
'Accept'
:
'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
,
'Accept-Language'
:
'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3'
,
'Connection'
:
'keep-alive'
,
'Upgrade-Insecure-Requests'
:
'1'
,
'Referer'
:
'https://mp.weixin.qq.com/'
}
def
search_public_account
(
self
,
nickname
):
"""通过昵称搜索公众号获取biz_id"""
print
(
f
"[*] 正在搜索公众号: {nickname}"
)
search_url
=
"https://mp.weixin.qq.com/cgi-bin/searchbiz"
params
=
{
'action'
:
'search_biz'
,
'lang'
:
'zh_CN'
,
'f'
:
'json'
,
'ajax'
:
'1'
,
'random'
:
random
.
random
(),
'query'
:
nickname
,
'begin'
:
'0'
,
'count'
:
'5'
}
try
:
headers
=
self
.
get_random_headers
()
response
=
self
.
session
.
get
(
search_url
,
params
=
params
,
headers
=
headers
)
response
.
raise_for_status
()
data
=
response
.
json
()
if
'list'
not
in
data
:
print
(
f
"[-] 搜索失败,响应: {data}"
)
return
None
accounts
=
data
[
'list'
]
if
not
accounts
:
print
(
f
"[-] 未找到昵称为 '{nickname}' 的公众号"
)
return
None
# 选择最匹配的结果
for
acc
in
accounts
:
if
acc
[
'nickname'
]
.
lower
()
==
nickname
.
lower
():
print
(
f
"[+] 找到公众号: {acc['nickname']} (biz: {acc['fakeid']})"
)
return
acc
[
'fakeid'
]
# 返回第一个结果
first_acc
=
accounts
[
0
]
print
(
f
"[+] 找到相似公众号: {first_acc['nickname']} (biz: {first_acc['fakeid']})"
)
return
first_acc
[
'fakeid'
]
except
Exception
as
e
:
print
(
f
"[-] 搜索失败: {str(e)}"
)
return
None
def
get_article_list
(
self
,
biz_id
,
count
=
10
):
"""获取公众号文章列表"""
print
(
f
"[*] 正在获取文章列表 (biz: {biz_id})"
)
article_list
=
[]
offset
=
0
while
len
(
article_list
)
<
count
:
url
=
"https://mp.weixin.qq.com/cgi-bin/appmsg"
params
=
{
'action'
:
'list_ex'
,
'begin'
:
str
(
offset
),
'count'
:
'5'
,
'fakeid'
:
biz_id
,
'type'
:
'9'
,
'query'
:
''
,
'token'
:
self
.
get_token
(),
'lang'
:
'zh_CN'
,
'f'
:
'json'
,
'ajax'
:
'1'
}
try
:
headers
=
self
.
get_random_headers
()
response
=
self
.
session
.
get
(
url
,
params
=
params
,
headers
=
headers
)
response
.
raise_for_status
()
data
=
response
.
json
()
if
'app_msg_list'
not
in
data
:
print
(
f
"[-] 获取文章列表失败,响应: {data}"
)
break
articles
=
data
[
'app_msg_list'
]
if
not
articles
:
print
(
"[+] 已获取所有文章"
)
break
for
article
in
articles
:
if
len
(
article_list
)
>=
count
:
break
article_list
.
append
({
'title'
:
article
[
'title'
],
'url'
:
article
[
'link'
],
'publish_time'
:
article
[
'create_time'
],
'cover'
:
article
[
'cover'
],
'digest'
:
article
[
'digest'
]
})
offset
+=
len
(
articles
)
print
(
f
"[+] 已获取 {len(article_list)}/{count} 篇文章"
)
time
.
sleep
(
self
.
delay
+
random
.
uniform
(
0
,
1
))
except
Exception
as
e
:
print
(
f
"[-] 获取文章列表出错: {str(e)}"
)
break
return
article_list
def
get_token
(
self
):
"""获取随机token值 (简化实现)"""
return
str
(
int
(
time
.
time
()
*
1000
))
def
extract_real_url
(
self
,
url
):
"""提取微信文章真实URL"""
try
:
# 处理微信跳转URL
if
'mp.weixin.qq.com/s?'
not
in
url
:
return
url
headers
=
self
.
get_random_headers
()
response
=
requests
.
get
(
url
,
headers
=
headers
,
allow_redirects
=
False
)
# 检查重定向
if
300
<=
response
.
status_code
<
400
:
location
=
response
.
headers
.
get
(
'Location'
)
if
location
:
return
location
# 从HTML中提取真实URL
soup
=
BeautifulSoup
(
response
.
text
,
'html.parser'
)
meta
=
soup
.
find
(
'meta'
,
attrs
=
{
'property'
:
'og:url'
})
if
meta
and
meta
.
get
(
'content'
):
return
meta
.
get
(
'content'
)
# 尝试从JS中提取
pattern
=
re
.
compile
(
r'var\s+msg_link\s*=\s*"([^"]+)"'
)
match
=
pattern
.
search
(
response
.
text
)
if
match
:
return
match
.
group
(
1
)
return
url
except
Exception
as
e
:
print
(
f
"[-] 提取真实URL失败: {str(e)}"
)
return
url
def
parse_article
(
self
,
url
):
"""解析文章内容"""
try
:
real_url
=
self
.
extract_real_url
(
url
)
print
(
f
"[*] 解析文章: {real_url}"
)
headers
=
self
.
get_random_headers
()
response
=
requests
.
get
(
real_url
,
headers
=
headers
)
response
.
raise_for_status
()
soup
=
BeautifulSoup
(
response
.
text
,
'html.parser'
)
# 提取文章元信息
title
=
soup
.
find
(
'h1'
,
id
=
'activity-name'
)
.
get_text
()
.
strip
()
if
soup
.
find
(
'h1'
,
id
=
'activity-name'
)
else
"无标题"
author
=
soup
.
find
(
'span'
,
id
=
'js_name'
)
.
get_text
()
.
strip
()
if
soup
.
find
(
'span'
,
id
=
'js_name'
)
else
"未知作者"
publish_time
=
soup
.
find
(
'em'
,
id
=
'publish_time'
)
.
get_text
()
.
strip
()
if
soup
.
find
(
'em'
,
id
=
'publish_time'
)
else
""
# 提取正文内容
content_div
=
soup
.
find
(
'div'
,
id
=
'js_content'
)
if
not
content_div
:
return
None
# 清理无用元素
for
elem
in
content_div
.
find_all
([
'script'
,
'style'
,
'iframe'
]):
elem
.
decompose
()
# 处理图片 - 替换为Markdown格式
for
img
in
content_div
.
find_all
(
'img'
):
if
img
.
get
(
'data-src'
):
img
.
replace_with
(
f
"
\n

\n
"
)
elif
img
.
get
(
'src'
):
img
.
replace_with
(
f
"
\n

\n
"
)
# 获取文本内容
content
=
content_div
.
get_text
()
.
strip
()
# 获取阅读数和点赞数
read_num
=
soup
.
find
(
'span'
,
class_
=
'read_num'
)
.
get_text
()
.
strip
()
if
soup
.
find
(
'span'
,
class_
=
'read_num'
)
else
"N/A"
like_num
=
soup
.
find
(
'span'
,
class_
=
'like_num'
)
.
get_text
()
.
strip
()
if
soup
.
find
(
'span'
,
class_
=
'like_num'
)
else
"N/A"
return
{
'title'
:
title
,
'author'
:
author
,
'publish_time'
:
publish_time
,
'url'
:
real_url
,
'content'
:
content
,
'read_count'
:
read_num
,
'like_count'
:
like_num
}
except
Exception
as
e
:
print
(
f
"[-] 解析文章失败: {str(e)}"
)
self
.
failed_urls
.
append
(
url
)
return
None
def
save_article
(
self
,
article_data
,
index
):
"""保存文章到文件"""
if
not
article_data
:
return
False
try
:
# 清理文件名中的非法字符
clean_title
=
re
.
sub
(
r'[\\/*?:"<>|]'
,
""
,
article_data
[
'title'
])
if
not
clean_title
:
clean_title
=
f
"无标题_{index}"
# 创建文件名
timestamp
=
int
(
time
.
time
())
filename
=
f
"{index:03d}_{clean_title[:50]}_{timestamp}.txt"
filepath
=
os
.
path
.
join
(
self
.
output_dir
,
"articles"
,
filename
)
# 写入文件
with
open
(
filepath
,
'w'
,
encoding
=
'utf-8'
)
as
f
:
f
.
write
(
f
"标题: {article_data['title']}
\n
"
)
f
.
write
(
f
"作者: {article_data['author']}
\n
"
)
f
.
write
(
f
"发布时间: {article_data['publish_time']}
\n
"
)
f
.
write
(
f
"原文链接: {article_data['url']}
\n
"
)
f
.
write
(
f
"阅读数: {article_data['read_count']}
\n
"
)
f
.
write
(
f
"点赞数: {article_data['like_count']}
\n
"
)
f
.
write
(
"
\n
===== 正文内容 =====
\n\n
"
)
f
.
write
(
article_data
[
'content'
])
self
.
article_count
+=
1
print
(
f
"[+] 已保存: {filename}"
)
return
True
except
Exception
as
e
:
print
(
f
"[-] 保存文章失败: {str(e)}"
)
return
False
def
save_summary
(
self
,
article_list
):
"""保存文章摘要信息"""
try
:
summary_path
=
os
.
path
.
join
(
self
.
output_dir
,
"summary.json"
)
summary
=
{
'crawl_time'
:
time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
),
'total_articles'
:
len
(
article_list
),
'success_count'
:
self
.
article_count
,
'failed_count'
:
len
(
self
.
failed_urls
),
'failed_urls'
:
self
.
failed_urls
,
'articles'
:
[]
}
for
article
in
article_list
:
summary
[
'articles'
]
.
append
({
'title'
:
article
[
'title'
],
'url'
:
article
[
'url'
],
'publish_time'
:
article
[
'publish_time'
],
'cover'
:
article
[
'cover'
],
'digest'
:
article
[
'digest'
]
})
with
open
(
summary_path
,
'w'
,
encoding
=
'utf-8'
)
as
f
:
json
.
dump
(
summary
,
f
,
ensure_ascii
=
False
,
indent
=
2
)
print
(
f
"[+] 已保存摘要文件: summary.json"
)
return
True
except
Exception
as
e
:
print
(
f
"[-] 保存摘要失败: {str(e)}"
)
return
False
def
run
(
self
,
count
=
10
):
"""运行爬虫"""
start_time
=
time
.
time
()
# 获取公众号biz_id
if
not
self
.
biz_id
and
self
.
nickname
:
self
.
biz_id
=
self
.
search_public_account
(
self
.
nickname
)
if
not
self
.
biz_id
:
print
(
"[-] 无法获取公众号ID,请检查输入"
)
return
False
# 获取文章列表
article_list
=
self
.
get_article_list
(
self
.
biz_id
,
count
)
if
not
article_list
:
print
(
"[-] 未获取到文章列表"
)
return
False
# 爬取并保存每篇文章
print
(
f
"
\n
[*] 开始爬取 {len(article_list)} 篇文章..."
)
for
idx
,
article
in
enumerate
(
article_list
,
1
):
article_data
=
self
.
parse_article
(
article
[
'url'
])
self
.
save_article
(
article_data
,
idx
)
time
.
sleep
(
self
.
delay
+
random
.
uniform
(
0
,
1.5
))
# 保存摘要信息
self
.
save_summary
(
article_list
)
# 输出统计信息
elapsed
=
time
.
time
()
-
start_time
print
(
f
"
\n
[+] 爬取完成! 共处理 {len(article_list)} 篇文章"
)
print
(
f
" 成功: {self.article_count} | 失败: {len(self.failed_urls)}"
)
print
(
f
" 耗时: {elapsed:.2f}秒 | 平均每篇: {elapsed / len(article_list):.2f}秒"
)
print
(
f
" 输出目录: {os.path.abspath(self.output_dir)}"
)
return
True
def
main
():
parser
=
argparse
.
ArgumentParser
(
description
=
"微信公众号文章爬虫工具"
,
formatter_class
=
argparse
.
ArgumentDefaultsHelpFormatter
)
# 公众号识别参数(二选一)
group
=
parser
.
add_mutually_exclusive_group
(
required
=
True
)
group
.
add_argument
(
'-b'
,
'--biz'
,
help
=
"公众号biz ID"
)
group
.
add_argument
(
'-n'
,
'-人民日报'
,
help
=
"人民日报"
)
cookies
=
"""
pgv_pvid=5538374392; rewardsn=; wxtokenkey=777; ua_id=RAT5yp73W8mHBRRIAAAAAOpFZKoCL61u87BvwP5wQ_A=; _clck=1q51fdm|1|fx2|0; wxuin=50840363385860; mm_lang=zh_CN; uuid=bd9016bb4e68ca1f5bce8539b86c1338; rand_info=CAESIOq3nQXU2j9pb+27l9uGkpwVHw6b2O30QBu2guQ+HO6z; slave_bizuin=3267633814; data_bizuin=3267633814; bizuin=3267633814; data_ticket=Ap5z2aD7HasqbT4e3fvglx26n9/fBsgw1zXFnXDms0xWtmn6FvperM3v+K4wfYcw; slave_sid=U0hObUVTVDBsZnplSDVTVGNLMVRUek1SalR6SFdFMWtwc09MWHZrY0owdWNiTkdNcks4VURtMFZGTGtpS2VJcFNQS19wSmpHMHlObHhJMmNrRDRJcEdKRGRJYktUQUJzZUpBSEpBczdRUG4wY0JhcUtZeG0xNDN6aUt3ZkpRbUtOWHJvUDNvM0pIMjgwampM; slave_user=gh_2f299fe29fbb; xid=c246ba7a46018bc2728432e2d3256ef3; mmad_session=32985bde5f5df355dda20a2df644228ce3f786a64961c1269f68927d9fe35dcd8c398ad7fb7064e64acd864f6395a23417694dad10c6e11ad4e64714f8e54bf84821c102216974578ff76cd025577a64a694f83af365d2b7aeccd3f606806fa611de1c56c245721266e7088080fefde3; pgv_info=ssid=s4781745685; ts_last=mp.weixin.qq.com/cgi-bin/frame; ts_uid=1189556960; sig=h019b1a330cd71c9171aec44d0acf24ecd8a597223fce1b2ef4557a7e5e09cef123e10b4d35fa9bea79; _clsk=2fr0r0|1750842219998|13|1|mp.weixin.qq.com/weheat-agent/payload/record
"""
# 其他参数
parser
.
add_argument
(
'-c'
,
cookies
,
required
=
True
,
help
=
cookies
)
parser
.
add_argument
(
'-o'
,
'--output'
,
default
=
"wechat_output"
,
help
=
""
)
parser
.
add_argument
(
'-d'
,
'--delay'
,
type
=
float
,
default
=
3.0
,
help
=
"请求延迟时间(秒)"
)
parser
.
add_argument
(
'-a'
,
'--amount'
,
type
=
int
,
default
=
10
,
help
=
"爬取文章数量"
)
args
=
parser
.
parse_args
()
print
(
"="
*
60
)
print
(
f
"微信公众号文章爬虫 - 开始执行"
)
print
(
"="
*
60
)
# 初始化爬虫
crawler
=
WeChatCrawler
(
biz_id
=
args
.
biz
,
nickname
=
args
.
nickname
,
cookie
=
args
.
cookie
,
output_dir
=
args
.
output
,
delay
=
args
.
delay
)
# 运行爬虫
crawler
.
run
(
count
=
args
.
amount
)
if
__name__
==
"__main__"
:
main
()
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment