python爬取A站(AcFun)整部番剧的弹幕

由 月琳 发布
  | 131 次浏览

前言

我想给我写的播放器换一套演示视频和弹幕,而弹幕正好在AcFun(以下简称A站)有,于是想着利用Python写一个脚本爬取完整的弹幕列表。本文只讲解爬取A站弹幕,至于弹幕格式转换不在本文范畴。

注:爬取番剧弹幕和爬取个人视频略有不同,本文只适用于爬取番剧的弹幕,但思路都是一样的,在两者有差异的地方我会有所标注。


分析网页

找到弹幕接口的请求与响应内容

首先打开想要爬取弹幕的视频,这里我用番剧《摇曳露营△》的播放页作示例:链接,然后打开开发者工具(F12或者Ctrl+Shift+i),将选项卡切换到”网络“,接着再刷新一次页面。面对一大堆数据,我们可以使用Ctrl+F进行搜索,挑一条弹幕作为搜索内容即可。

1

(图中右侧的载荷里是请求参数,不同视频的resourceId也不同)

很容易就找到了弹幕接口的信息,不过这里能找到两个弹幕的接口,分别是pollByPositionlist

经过研究发现,pollByPosition接口是视频播放时分段进行获取弹幕的接口,这种隔一段时间获取一次弹幕的方式能确保弹幕的实时性;而list接口则是网页右侧展示的弹幕列表的接口,该接口可以获取到按时间正序与倒序排序的弹幕。两个接口都可以进行爬取数据,本文就用list接口作为示例

2

通过手动翻页再分析请求头发现请求参数的pcursor对应上了页码,而响应内容也有一个pcursor刚好对应上了下一页的页码;当我选择最后一页时,发现响应内容的pcursor变成了“no_more”,我想这应该代表已经是最后一页没有下一页了。

3

列出请求表

请求地址https://www.acfun.cn/rest/pc-direct/new-danmaku/list
请求方式POST
请求内容类型(Content-Type)application/x-www-form-urlencoded
请求参数resourceId视频的唯一id,等同于videoId。
resourceType似乎固定填9。
enableAdvanced填true或false。是否开启高级弹幕(可能)。
pcursor填数字。分页的页码,从1开始。
count填数字。获取的弹幕数量,限制在[1~200]之间,再多也等同200。
sortType填1或2。1为不排序,2为排序(挺奇葩的)。
asc填true或false。true为正序,false为倒序。

列出响应表

响应内容类型(content-type)application/json
响应参数result似乎固定为0
danmakus弹幕列表
styleDanmakuCount可能是花里胡哨弹幕的数量?
requestId反正用不到
pcursor下一页的页码,当值为“no_more”则表示没有下一页
host-name反正用不到
totalCount该视频的总弹幕数量

经过分析可以得到以上的请求与响应表格(基于效果进行的猜测,不一定100%准确)。

在代码的实现上我们可以爬取每一页的弹幕,组合在一起就能得到所有弹幕的集合。但在这之前也有个问题,resourceId没有(通过代码的)获取方式,不可能每次都通过开发者选项获取吧。不过好在也有解决方法:

首先想到通过查看网页源代码的方式,在页面空白处右键鼠标即可找到:

4

利用Ctrl+F查找resourceId,但寻了一会没找到有用的信息(而且都在js代码里不太好反推),索性直接搜寻我当前视频已知的resourceId:11137579(上文中的响应内容有),于是找到了这两个有效的信息(下图左侧):

5

经过搜索发现11137579经常在window.pageInfowindow.bangumiList这两行中出现,于是我在网页的控制台直接输入了个这两个变量名,这样可以非常方便地查看那两行后面赋值的json数据。经过查阅,可以确定pageInfo变量中的videoId即为该视频的resourceId,而bangumiList中的items有整个番剧的所有话/集(第1话、第2话...这种),每一话中同样也有videoId。这样一来就可以一次性获取到当前视频的resourceId和整部番中所有话的resourceId,一举两得。

注:非番剧的视频的pageInfo这一行是window.pageInfo = window.videoInfo,并且视频的resourceId是currentVideoId,与番剧视频略有不同。


代码实践

实现获取当前的resourceId与整部番的数据

通过链接地址来获取resourceId(videoId),由于该id是存在于网页的js代码中,固没办法用parsel库,于是转而使用正则表达式匹配。实现步骤如下:

  1. 使用request库获取接口的数据

    这就很简单了,不过user-agent需要自定义,否则请求会被A站拒绝。

    ### 只适合于番剧
    import json
    import re
    import requests
    
    web_link = "https://www.acfun.cn/bangumi/aa6000991" # 地址
    response = requests.get(
        web_link,
        headers={"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"}
    )
    # print(response.text) # 响应内容
  2. 使用正则匹配

    # 正则获取pageInfo(仅适用于番剧视频)
    page_info_str = re.findall(r'window\.pageInfo = window\.bangumiData = \{(.{0,})\};\n', response.text)[0] 
    info = json.loads('{' + page_info_str + '}') # Map类型的pageInfo数据
    print("当前页面的(番剧)视频videoId:{}".format(page_info["videoId"]))
    
    # 正则获取bangumiList(仅适用于番剧视频)
    bangumi_list_str = re.findall(r'window\.bangumiList = \{(.{0,})\};\n', response.text)[0]
    bangumi_list = json.loads('{' + bangumi_list_str + '}')  # Map类型的bangumiList数据
    for item in bangumi_list["items"]:
        print("{}:{},videoId:{}".format(item["episodeName"], item["title"], item["videoId"]))

    输出结果如下

    6

    注:如果是非番剧视频,即个人用户上传的视频,则这样获取pageInfo:

    page_info_str = re.findall(r'window\.pageInfo = window\.videoInfo = \{(.{0,})\};\n', response.text)[0]
    page_info = json.loads('{' + page_info_str + '}')
    print("当前页面的视频videoId:{}".format(page_info["currentVideoId"]))
  3. 最后将代码封装成函数

    def get_bangumi_video_info(web_link):
        """ 通过网站链接获取当前视频与整部番剧的信息(仅限番剧页)\n
        Args:
            web_link (String): 网页链接
        return for example:
            {
                "currentVideoId" : 11137579,
                "bangumiList": [
                    {"episodeName": "第1话", "title": "富士山与咖喱面", "videoId": 11137579},
                    ...
                ],
            }
        """
        result = {
            "currentVideoId" : None, # 当前视频的videoId
            "bangumiList": [], # 整部番剧的剧集
        } # 返回的结果
        response = requests.get(
            web_link,
            headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"}
        )    
        page_info_str = re.findall(r'window\.pageInfo = window\.bangumiData = \{(.{0,})\};\n', response.text)[0]
        page_info = json.loads('{' + page_info_str + '}')
        result["currentVideoId"] = page_info["videoId"]
        bangumi_list_str = re.findall(r'window\.bangumiList = \{(.{0,})\};\n', response.text)[0]
        bangumi_list = json.loads('{' + bangumi_list_str + '}')
        for item in bangumi_list["items"]:
            bangumi = {
                "episodeName": item["episodeName"],
                "title": item["title"],
                "videoId": item["videoId"],
            }
            result["bangumiList"].append(bangumi)
        return result
    
    print(get_bangumi_video_info(web_link))

    输出结果如下图左侧,而右侧是格式化json后的数据

    7

    有了这个函数,我们就可以很方便的提取出单前视频和整部番剧每一集的信息,为下文爬取整部番的弹幕提供基础。

实现通过videoId获取弹幕数据

一步一步实现,先从爬取单个视频的一页弹幕开始,再单个视频的完整弹幕列表,最后再实现爬取整个番剧所有视频的弹幕。

  1. 通过videoId获取第一页的200个弹幕,并写入到output.json文件中

    import requests
    
    video_id= 11137579 # 等同于resourceId
    pcursor = 1 # 等同于页码
    response = requests.post(
        "https://www.acfun.cn/rest/pc-direct/new-danmaku/list", 
        data={
            "resourceId": video_id,
            "resourceType": 9,
            "enableAdvanced": True,
            "pcursor": pcursor,
            "count": 200,
            "sortType": 2,
            "asc": True,
        },
        headers={"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"}
    )
    # 写入到output.json文件中
    with open('output.json', 'w', encoding='utf_8', newline='') as file:
        file.write(response.text)

    将output.json文件用编辑器格式化后如下:

    8

    成功爬取到一页弹幕,接下来就该实现爬取全部弹幕了。

  2. 爬取完整的弹幕列表

    由于响应数据是json格式,没法直接当成字典访问,所以就需要python自带json库进行转换:json.loads(str)函数将json字符串转换成Map字典类型,而json.dumps(map)则是将Map字典类型转换成json字符串。

    于是代码实现如下:

    import json
    from time import sleep
    import requests
    
    video_id= 11137579 # 等同于resourceId
    pcursor = 1 # 等同于页码
    danmakus = [] # 弹幕列表
    output_json = {} # 最终输出的结果,包含了danmakus和其他相关信息
    while True:
        response = requests.post(
            "https://www.acfun.cn/rest/pc-direct/new-danmaku/list", 
            data={
                "resourceId": video_id,
                "resourceType": 9,
                "enableAdvanced": True,
                "pcursor": pcursor,
                "count": 200,
                "sortType": 2,
                "asc": True,
            },
            headers={"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"}
        )
        # 将json字符串格式化为Map字典
        response_json = json.loads(response.text)
        # 往弹幕列表里追加本次获取到的弹幕
        danmakus.extend(response_json["danmakus"])
        if response_json["pcursor"] == "no_more":
            # 没有下一页了,就跳出循环
            break
        pcursor += 1 # 下一页
        sleep(0.1) # 休息0.1秒,防止访问频率过快
        
    output_json = response_json # 最后一页的数据直接拿来用
    output_json["danmakus"] = danmakus # 将完整的弹幕集合替换进去
    # 写入output.json文件
    with open('output.json', 'w', encoding='utf_8', newline='') as file:
        file.write(json.dumps(output_json))

    这里使用了while死循环,每次爬取一页数据并且页码依次累加,直到响应数据里的pcursor为"no_more"时停止循环。使用sleep(seconds)函数进行睡眠,避免访问过于频繁被A站给封了IP。

    打开output.json查看:

    9

    这样就爬取到了完整的弹幕列表,足足6千多条。但是我发现里面本来应有的中文弹幕都变成了\u****这种Unicode编码字符,显然编码不对,可我在file输出指定了encoding='utf_8'不应该编码出错啊。

    调试了代码结合搜索引擎,发现是json库的锅,json.dumps()函数里有一条参数:ensure_ascii=True,该参数默认开启导致转换后的字符串编码统统变成ASCII,只需要将其赋值为False即可输出中文。

    于是最后一行代码(37行)改成:

        file.write(json.dumps(output_json, ensure_ascii=False))

    之后,output.json的中文就正常显示了。

  3. 将以上代码封装为函数

    在上文中已经定义了一个函数get_bangumi_video_info(web_link),我们可以利用该函数返回的信息进行结合,实现一个request_danmakus(video_id, dir_path)函数:传入视频id和存放的文件夹路径,自动爬取该视频的弹幕保存为文件。

    代码实现如下:

    def request_danmakus(video_id, dir_path):
        """爬取番剧每集的弹幕存储到文件夹中,自动命名。\n
        Args:
            video_id (String): resourceId
            dir_path (String): 存放的文件夹路径
        """
        pcursor = 1 # 页码
        danmakus = [] # 用于输出结果的弹幕列表
        output_json = {} # 最终输出的结果,包含了danmakus和其他相关信息
        while True:
            response = requests.post(
                "https://www.acfun.cn/rest/pc-direct/new-danmaku/list", 
                data={
                    "resourceId": video_id,
                    "resourceType": 9,
                    "enableAdvanced": True,
                    "pcursor": pcursor,
                    "count": 200,
                    "sortType": 2,
                    "asc": True,
                },
                headers={"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"}
            )
            # 将json字符串格式化为Map字典
            response_json = json.loads(response.text)
            # 往弹幕列表里追加本次获取到的弹幕
            danmakus.extend(response_json["danmakus"])
            if response_json["pcursor"] == "no_more":
                # 没有下一页了,就跳出循环
                break
            pcursor += 1 # 下一页
            sleep(0.1) # 休息0.1秒,防止访问频率过快
        output_json = response_json # 最后一页的数据直接拿来用
        output_json["danmakus"] = danmakus # 将完整的弹幕集合替换进去
        # 单个弹幕文件的路径
        file_path = "{}/{}-{}.json".format(dir_path, bangumi["episodeName"], bangumi["title"])
        with open(file_path, 'w', encoding='utf_8', newline='') as file:
            file.write(json.dumps(output_json, ensure_ascii=False))
        print("成功获取:{}".format(file_path))

    调用示例:request_danmakus(11137579, "./output_danmakus"),执行完毕后在output_danmakus文件夹内就可以看到第1话-富士山与咖喱面.json的弹幕文件,不过多赘述。

实现自动化爬取整部番剧的弹幕

  • get_bangumi_video_inforequest_danmakus函数结合使用,可以编写出自动化脚本:

    import json
    import os
    import re
    from time import sleep
    import requests
    
    output_dir_path = "./output_danmakus" # 输出弹幕文件的目录路径
    print("该脚本功能:爬取A站某部番剧的每一集所有弹幕,导出到'{}'目录中".format(output_dir_path))
    url = input("请输入任意一集的链接(例如https://www.acfun.cn/bangumi/aa6000991):") # 番剧网站的链接(任意一集)
    
    def get_bangumi_video_info(web_link):
        # 省略...上文中有...
      
    def request_danmakus(video_id, dir_path):
        # 省略...上文中有...
        
    if not os.path.exists(dir):
        os.makedirs(dir) # 文件夹不存在则创建
    bangumi_list = get_bangumi_video_info(url)["bangumiList"] # 获取到番剧的视频列表
    print("开始获取弹幕...")
    for bangumi in bangumi_list:
        video_id = bangumi["videoId"] # resourceId
        request_danmakus(video_id, output_dir_path)

    只需输入某部番剧任意一集的网页链接,就可以自动爬取每一集的弹幕存储在./output_danmakus目录中,并且弹幕文件的命名也人性化处理了。

  • 效果如下图:

    10


最终完整脚本代码

import json
import os
import re
from time import sleep
import requests

output_dir_path = "./output_danmakus" # 输出弹幕文件的目录路径
print("该脚本功能:爬取A站某部番剧的每一集所有弹幕,导出到'{}'目录中".format(output_dir_path))
url = input("请输入任意一集的链接(例如https://www.acfun.cn/bangumi/aa6000991):") # 番剧网站的链接(任意一集)

def get_bangumi_video_info(web_link):
    """ 通过网站链接获取当前视频与整部番剧的信息(仅限番剧页)\n
    Args:
        web_link (String): 网页链接
    return for example:
        {
            "currentVideoId" : 11137579,
            "bangumiList": [
                {"episodeName": "第1话", "title": "富士山与咖喱面", "videoId": 11137579},
                ...
            ],
        }
    """
    result = {
        "currentVideoId" : None,
        "bangumiList": [],
    } # 返回的结果
    response = requests.get(
        web_link,
        headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"}
    )    
    page_info_str = re.findall(r'window\.pageInfo = window\.bangumiData = \{(.{0,})\};\n', response.text)[0]
    page_info = json.loads('{' + page_info_str + '}')
    result["currentVideoId"] = page_info["videoId"]
    bangumi_list_str = re.findall(r'window\.bangumiList = \{(.{0,})\};\n', response.text)[0]
    bangumi_list = json.loads('{' + bangumi_list_str + '}')
    for item in bangumi_list["items"]:
        bangumi = {
            "episodeName": item["episodeName"],
            "title": item["title"],
            "videoId": item["videoId"],
        }
        result["bangumiList"].append(bangumi)
    return result


def request_danmakus(video_id, dir_path):
    """爬取番剧每集的弹幕存储到文件夹中,自动命名。\n
    Args:
        video_id (String): resourceId
        dir_path (String): 存放的文件夹路径
    """
    pcursor = 1 # 页码
    danmakus = [] # 用于输出结果的弹幕列表
    output_json = {} # 最终输出的结果,包含了danmakus和其他相关信息
    while True:
        response = requests.post(
            "https://www.acfun.cn/rest/pc-direct/new-danmaku/list", 
            data={
                "resourceId": video_id,
                "resourceType": 9,
                "enableAdvanced": True,
                "pcursor": pcursor,
                "count": 200,
                "sortType": 2,
                "asc": True,
            },
            headers={"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36"}
        )
        # 将json字符串格式化为Map字典
        response_json = json.loads(response.text)
        # 往弹幕列表里追加本次获取到的弹幕
        danmakus.extend(response_json["danmakus"])
        if response_json["pcursor"] == "no_more":
            # 没有下一页了,就跳出循环
            break
        pcursor += 1 # 下一页
        sleep(0.1) # 休息0.1秒,防止访问频率过快
    output_json = response_json # 最后一页的数据直接拿来用
    output_json["danmakus"] = danmakus # 将完整的弹幕集合替换进去
    # 单个弹幕文件的路径
    file_path = "{}/{}-{}.json".format(dir_path, bangumi["episodeName"], bangumi["title"])
    with open(file_path, 'w', encoding='utf_8', newline='') as file:
        file.write(json.dumps(output_json, ensure_ascii=False))
    print("成功获取:{}".format(file_path))

if not os.path.exists(output_dir_path):
    os.makedirs(output_dir_path) # 文件夹不存在则创建
bangumi_list = get_bangumi_video_info(url)["bangumiList"] # 获取到番剧的视频列表
print("开始获取弹幕...")
for bangumi in bangumi_list:
    video_id = bangumi["videoId"] # resourceId
    request_danmakus(video_id, output_dir_path)

其他

  • 如果需要将弹幕json格式转换成xml格式,需要注意可能存在<>&这些html特殊字符,需要使用 html.escape(response.text, quote=False)函数代码对响应内容进行额外处理。将上文中出现的response_json代码替换成以下代码即可:

    # 原代码:response_json = json.loads(response.text)
    response_json = json.loads(html.escape(response.text, quote=False))
  • 上文中出现的sleep(0.1)可能比较保守,可以自行调整甚至直接去掉该行代码也没问题,反正都是大局域网

版权属于:月琳cc
本文链接:https://yleen.cc/archives/acfun-danmakus.html
作品采用《知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议》进行许可,转载请务必注明出处!

暂无评论

发表评论