xhs:小红书数据采集的高效Python SDK架构深度解析

张开发
2026/4/18 5:04:31 15 分钟阅读

分享文章

xhs:小红书数据采集的高效Python SDK架构深度解析
xhs小红书数据采集的高效Python SDK架构深度解析【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs在当今数据驱动的互联网时代小红书作为国内领先的社交电商平台其海量用户生成内容蕴含着巨大的商业价值和市场洞察。然而直接通过官方API获取数据往往面临诸多限制而传统的爬虫方案又需要处理复杂的反爬机制和动态加密逻辑。xhs项目应运而生为开发者提供了一个企业级的小红书数据采集解决方案。项目定位与技术价值xhs项目并非简单的爬虫工具而是一个完整的Python SDK框架它通过深入研究小红书Web端的请求机制封装了一套稳定可靠的数据采集接口。项目的核心价值在于技术架构优势协议层封装将复杂的HTTP请求、签名验证、Cookie管理抽象为简洁的API调用模块化设计核心功能与辅助工具分离便于维护和扩展错误处理机制内置完善的异常处理应对各种网络异常和反爬策略生产环境就绪特性支持代理配置适应企业级部署环境提供完整的类型提示提升开发体验内置连接池管理优化网络性能核心架构深度解析请求处理引擎设计xhs的核心架构围绕XhsClient类展开该类采用了分层的设计理念class XhsClient: def __init__(self, cookieNone, user_agentNone, timeout10, proxiesNone, signNone): self.proxies proxies self.__session: requests.Session requests.session() self.timeout timeout self.external_sign sign self._host https://edith.xiaohongshu.com self._creator_host https://creator.xiaohongshu.com self._customer_host https://customer.xiaohongshu.com self.home https://www.xiaohongshu.com # 请求头配置 self.__session.headers { user-agent: self.user_agent or DEFAULT_USER_AGENT, Content-Type: application/json, }架构设计亮点会话管理使用requests.Session维护持久连接减少TCP握手开销多端点支持针对小红书的不同服务端edith、creator、customer提供独立配置插件式签名支持外部签名函数注入便于应对签名算法更新签名验证机制签名验证是小红书API安全机制的核心xhs提供了两种签名策略def _pre_headers(self, url: str, dataNone, quick_sign: bool False): if quick_sign: # 快速签名模式适用于创作者和客户服务端 signs sign(url, data, a1self.cookie_dict.get(a1)) self.__session.headers.update({x-s: signs[x-s]}) self.__session.headers.update({x-t: signs[x-t]}) self.__session.headers.update({x-s-common: signs[x-s-common]}) else: # 外部签名函数支持复杂的浏览器模拟签名 self.__session.headers.update( self.external_sign( url, data, a1self.cookie_dict.get(a1), web_sessionself.cookie_dict.get(web_session, ), ) )签名策略对比签名类型适用场景性能影响稳定性快速签名创作者/客户API低延迟中等浏览器模拟核心内容API高延迟高自定义签名特殊需求可变取决于实现错误处理体系xhs定义了完整的错误处理体系确保应用的健壮性from xhs.exception import ( DataFetchError, ErrorEnum, IPBlockError, NeedVerifyError, SignError ) def request(self, method, url, **kwargs): response self.__session.request( method, url, timeoutself.timeout, proxiesself.proxies, **kwargs ) # 状态码异常处理 if response.status_code 471 or response.status_code 461: verify_type response.headers[Verifytype] verify_uuid response.headers[Verifyuuid] raise NeedVerifyError( f出现验证码请求失败Verifytype: {verify_type}Verifyuuid: {verify_uuid}, responseresponse, verify_typeverify_type, verify_uuidverify_uuid) # 业务错误码处理 elif data.get(code) ErrorEnum.IP_BLOCK.value.code: raise IPBlockError(ErrorEnum.IP_BLOCK.value.msg, responseresponse) elif data.get(code) ErrorEnum.SIGN_FAULT.value.code: raise SignError(ErrorEnum.SIGN_FAULT.value.msg, responseresponse)核心功能实现详解内容搜索功能xhs提供了丰富的搜索接口支持多种排序和过滤条件def search_note( self, keyword: str, page: int 1, page_size: int 20, sort: str general, note_type: str 0 ): 搜索笔记 Args: keyword: 搜索关键词 page: 页码从1开始 page_size: 每页数量最大50 sort: 排序方式可选值: general(综合), time_desc(最新), popular(最热) note_type: 笔记类型0:全部, 1:图文, 2:视频 params { keyword: keyword, page: page, page_size: page_size, sort: sort, note_type: note_type, search_id: get_search_id(), image_formats: jpg,webp,avif, } return self.get(/api/sns/web/v1/search/notes, paramsparams)搜索参数优化建议使用合适的page_size平衡请求效率和数据量根据业务需求选择合适的排序策略利用search_id实现请求追踪用户数据采集获取用户信息和笔记列表是常见的业务场景def get_user_info(self, user_id: str): 获取用户基本信息 params {target_user_id: user_id} return self.get(/api/sns/web/v1/user/otherinfo, paramsparams) def get_user_notes( self, user_id: str, cursor: str , page_size: int 20 ): 获取用户发布的笔记列表 params { user_id: user_id, cursor: cursor, num: page_size, image_formats: jpg,webp,avif, } return self.get(/api/sns/web/v1/user_posted, paramsparams)内容详情解析获取笔记详情并提取多媒体资源from xhs.help import get_imgs_url_from_note, get_video_url_from_note def get_note_detail(self, note_id: str, xsec_token: str ): 获取笔记详情 params { source_note_id: note_id, image_formats: jpg,webp,avif, } if xsec_token: params[xsec_token] xsec_token note_data self.get(/api/sns/web/v1/feed, paramsparams) # 提取图片和视频URL images get_imgs_url_from_note(note_data) videos get_video_url_from_note(note_data) return { note: note_data, images: images, videos: videos }部署与应用实践Docker容器化部署xhs-api子项目提供了完整的Docker部署方案# Dockerfile FROM python:3.9-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD [python, app.py]部署配置示例# docker-compose.yml version: 3.8 services: xhs-api: build: ./xhs-api ports: - 8000:8000 environment: - REDIS_HOSTredis - REDIS_PORT6379 - REDIS_DB0 volumes: - ./data:/app/data depends_on: - redis redis: image: redis:alpine ports: - 6379:6379 volumes: - redis-data:/data volumes: redis-data:生产环境配置环境变量配置# .env.production XHS_API_TIMEOUT30 XHS_MAX_RETRIES3 XHS_REQUEST_DELAY1.5 REDIS_URLredis://localhost:6379/0 LOG_LEVELINFONginx反向代理配置# nginx.conf upstream xhs_api { server 127.0.0.1:8000; } server { listen 80; server_name api.yourdomain.com; location / { proxy_pass http://xhs_api; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; # 请求限制 limit_req zoneapi burst20 nodelay; limit_req_status 429; } # 限流配置 limit_req_zone $binary_remote_addr zoneapi:10m rate10r/s; }性能优化指南请求频率控制策略合理的请求频率控制是保证服务稳定性的关键import time import random from datetime import datetime from functools import wraps def rate_limiter(max_calls10, period60): 请求频率限制装饰器 def decorator(func): calls [] wraps(func) def wrapper(*args, **kwargs): now datetime.now().timestamp() # 清理过期记录 calls[:] [call for call in calls if now - call period] if len(calls) max_calls: sleep_time period - (now - calls[0]) if sleep_time 0: time.sleep(sleep_time) calls.append(now) return func(*args, **kwargs) return wrapper return decorator class OptimizedXhsClient(XhsClient): rate_limiter(max_calls30, period60) # 每分钟最多30次请求 def search_note(self, keyword: str, page: int 1, page_size: int 20): return super().search_note(keyword, page, page_size)缓存策略实现利用Redis实现请求结果缓存减少重复请求import redis import json import hashlib from typing import Any, Optional class CachedXhsClient(XhsClient): def __init__(self, redis_client: redis.Redis, *args, **kwargs): super().__init__(*args, **kwargs) self.redis redis_client self.cache_ttl 3600 # 缓存1小时 def _generate_cache_key(self, method: str, params: dict) - str: 生成缓存键 param_str json.dumps(params, sort_keysTrue) key_data f{method}:{param_str} return hashlib.md5(key_data.encode()).hexdigest() def search_note(self, keyword: str, page: int 1, page_size: int 20) - dict: params { keyword: keyword, page: page, page_size: page_size } cache_key self._generate_cache_key(search_note, params) # 尝试从缓存获取 cached self.redis.get(cache_key) if cached: return json.loads(cached) # 执行实际请求 result super().search_note(keyword, page, page_size) # 缓存结果 self.redis.setex(cache_key, self.cache_ttl, json.dumps(result)) return result并发请求优化使用异步IO提升批量请求效率import asyncio import aiohttp from typing import List class AsyncXhsClient: def __init__(self, cookie: str, max_concurrent: int 5): self.cookie cookie self.semaphore asyncio.Semaphore(max_concurrent) async def batch_search(self, keywords: List[str]) - List[dict]: 批量搜索多个关键词 async with aiohttp.ClientSession() as session: tasks [] for keyword in keywords: task self._search_single(session, keyword) tasks.append(task) results await asyncio.gather(*tasks, return_exceptionsTrue) return results async def _search_single(self, session: aiohttp.ClientSession, keyword: str): 单个搜索请求 async with self.semaphore: # 实现异步请求逻辑 await asyncio.sleep(1) # 请求间隔 # 这里实现实际的异步请求 return {keyword: keyword, results: []}生态集成方案与数据分析工具集成xhs可以轻松集成到现有的数据分析流水线中import pandas as pd from sqlalchemy import create_engine from xhs import XhsClient class XhsDataPipeline: def __init__(self, db_url: str): self.client XhsClient(cookieyour_cookie) self.engine create_engine(db_url) def collect_and_store(self, keyword: str, pages: int 10): 收集数据并存储到数据库 all_notes [] for page in range(1, pages 1): try: results self.client.search_note(keyword, pagepage) notes results.get(items, []) # 数据清洗和转换 processed_notes self._process_notes(notes) all_notes.extend(processed_notes) # 控制请求频率 time.sleep(1.5) except Exception as e: print(f第{page}页数据获取失败: {e}) continue # 保存到数据库 if all_notes: df pd.DataFrame(all_notes) df.to_sql(xhs_notes, self.engine, if_existsappend, indexFalse) return len(all_notes) def _process_notes(self, notes: list) - list: 处理原始笔记数据 processed [] for note in notes: processed.append({ note_id: note.get(id), title: note.get(title), user_id: note.get(user, {}).get(user_id), like_count: note.get(like_count, 0), collect_count: note.get(collect_count, 0), comment_count: note.get(comment_count, 0), share_count: note.get(share_count, 0), timestamp: note.get(time), collected_at: datetime.now().isoformat() }) return processed监控与告警系统集成Prometheus和Grafana实现服务监控from prometheus_client import Counter, Histogram, start_http_server import time # 定义监控指标 REQUEST_COUNT Counter(xhs_requests_total, Total requests to xhs API) REQUEST_LATENCY Histogram(xhs_request_latency_seconds, Request latency in seconds) ERROR_COUNT Counter(xhs_errors_total, Total errors from xhs API) class MonitoredXhsClient(XhsClient): def request(self, method, url, **kwargs): start_time time.time() REQUEST_COUNT.inc() try: response super().request(method, url, **kwargs) latency time.time() - start_time REQUEST_LATENCY.observe(latency) return response except Exception as e: ERROR_COUNT.inc() raise e # 启动监控服务器 start_http_server(8000)与消息队列集成使用消息队列实现异步数据处理import pika import json from xhs import XhsClient class XhsMessageQueue: def __init__(self, rabbitmq_url: str): self.client XhsClient(cookieyour_cookie) self.connection pika.BlockingConnection( pika.URLParameters(rabbitmq_url) ) self.channel self.connection.channel() self.channel.queue_declare(queuexhs_tasks, durableTrue) def publish_search_task(self, keyword: str, pages: int 5): 发布搜索任务到消息队列 task { action: search, keyword: keyword, pages: pages, timestamp: time.time() } self.channel.basic_publish( exchange, routing_keyxhs_tasks, bodyjson.dumps(task), propertiespika.BasicProperties( delivery_mode2, # 持久化消息 ) ) def start_consumer(self): 启动任务消费者 def callback(ch, method, properties, body): task json.loads(body) if task[action] search: self._process_search_task(task) ch.basic_ack(delivery_tagmethod.delivery_tag) self.channel.basic_qos(prefetch_count1) self.channel.basic_consume( queuexhs_tasks, on_message_callbackcallback ) print(开始消费xhs任务...) self.channel.start_consuming()总结xhs项目作为小红书数据采集的专业解决方案在技术架构、性能优化和生态集成方面都展现了极高的成熟度。通过深入分析其核心设计我们可以看到架构设计的合理性分层设计、模块化组织、完善的错误处理性能优化的全面性请求频率控制、缓存策略、并发处理生态集成的灵活性支持多种部署方式、易于集成到现有系统对于需要从小红书平台获取数据的技术团队xhs提供了从基础数据采集到企业级部署的完整解决方案。其开源特性也使得开发者可以根据具体业务需求进行定制化开发真正实现了开箱即用深度可定制的技术目标。在实际应用中建议团队根据具体业务场景选择合适的部署策略和优化方案平衡数据采集效率与系统稳定性确保在合规的前提下最大化数据价值。【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考

更多文章