019、微服务架构初探:使用FastAPI构建与通信

张开发
2026/4/7 9:22:21 15 分钟阅读

分享文章

019、微服务架构初探:使用FastAPI构建与通信
019、微服务架构初探使用FastAPI构建与通信昨天深夜排查一个线上问题服务A调用服务B的接口超时日志里全是ConnectionTimeout。查了半天发现是服务B的实例数不够但更根本的问题是——两个服务间的通信协议选型时没考虑超时熔断。这个坑让我决定专门写一篇聊聊用FastAPI搞微服务时那些实战中才会遇到的事儿。微服务不是银弹很多人一上来就把单体拆成十几个微服务结果运维复杂度指数级上升。我见过最离谱的架构一个用户注册流程要经过五个服务每个服务都有自己的数据库链路追踪像一团乱麻。微服务的核心价值是独立部署和弹性伸缩如果业务本身耦合紧密硬拆只会增加痛苦。用FastAPI做微服务有个天然优势它的异步支持真好用。当服务需要并行调用多个下游时async/await模式比线程池清爽太多。但注意如果你的服务主要是CPU密集型计算异步带来的收益有限。服务间通信的三种姿势HTTP通信简单但需要谨慎# services/user_service.pyimporthttpxfromfastapiimportFastAPI,HTTPException appFastAPI()# 错误示范每次调用都创建新client连接池没复用asyncdefget_order_bad(user_id:int):asyncwithhttpx.AsyncClient()asclient:# 这里踩过坑responseawaitclient.get(fhttp://order-service/orders/{user_id})returnresponse.json()# 正确姿势复用client实例classOrderServiceClient:def__init__(self):self.clienthttpx.AsyncClient(base_urlhttp://order-service,timeout30.0,# 超时设置必须要有limitshttpx.Limits(max_connections100)# 连接池限制)asyncdefget_orders(self,user_id:int):try:responseawaitself.client.get(f/orders/{user_id})response.raise_for_status()returnresponse.json()excepthttpx.TimeoutException:# 超时要有降级策略return{fallback:True,orders:[]}order_clientOrderServiceClient()app.get(/users/{user_id}/profile)asyncdefuser_profile(user_id:int):# 并行调用多个下游服务user_data,order_dataawaitasyncio.gather(get_user_from_db(user_id),order_client.get_orders(user_id)# 用封装好的client)return{**user_data,orders:order_data}HTTP通信最容易被忽视的是超时设置。我建议生产环境设置两层超时连接超时3-5秒和读取超时10-30秒。更关键的是要有熔断机制下游连续失败时快速失败避免雪崩。gRPC性能要求高时的选择当服务间数据交换频繁或需要双向流时gRPC比HTTP/1.1高效得多。FastAPI通过grpcio和protobuf可以很好支持。# protobuf定义的消息结构# user.proto syntax proto3; message UserRequest { int32 user_id 1; } message UserProfile { int32 user_id 1; string name 2; repeated string tags 3; # 重复字段对应List } # client.pyimportgrpcfromuser_pb2importUserRequestfromuser_pb2_grpcimportUserServiceStub channelgrpc.aio.insecure_channel(user-service:50051,options[(grpc.keepalive_time_ms,10000),# 保活设置很重要(grpc.max_reconnect_backoff_ms,5000)])stubUserServiceStub(channel)asyncdefget_user_grpc(user_id:int):try:responseawaitstub.GetUser(UserRequest(user_iduser_id))return{name:response.name,tags:list(response.tags)}exceptgrpc.aio.AioRpcErrorase:ife.code()grpc.StatusCode.DEADLINE_EXCEEDED:# gRPC超时处理returnfallback_datagRPC的坑主要在部署。需要确保服务发现机制Consul或K8s DNS工作正常而且不同服务的proto文件版本要同步。建议在CI/CD流水线中加入proto编译检查。消息队列解耦利器对于非实时性操作比如发送邮件、生成报表用消息队列能显著提升系统韧性。# services/notification_service.pyimportasynciofromaiokafkaimportAIOKafkaConsumerfromfastapiimportFastAPI appFastAPI()asyncdefconsume_messages():consumerAIOKafkaConsumer(user_registered,bootstrap_serverskafka:9092,group_idnotification_group)awaitconsumer.start()# 别在消息处理里抛异常还不捕获asyncformsginconsumer:try:user_datajson.loads(msg.value)awaitsend_welcome_email(user_data[email])awaitconsumer.commit()# 手动提交偏移量exceptExceptionase:# 死信队列或重试逻辑必须有awaitsend_to_dlq(msg.value,str(e))logger.error(f处理消息失败:{e})app.on_event(startup)asyncdefstartup_event():asyncio.create_task(consume_messages())# 后台启动消费者消息队列最怕消息丢失和重复消费。一定要实现幂等性处理比如通过消息ID去重。Kafka的enable.auto.commit建议设为False手动提交偏移量更可控。服务发现的实战经验开发环境用Docker Compose时可以直接用服务名通信。但上生产后服务发现就成了必须品。# docker-compose.yml 片段services:user-service:build:./userports:-8000:8000# 开发环境直接用服务名# 生产环境需要更复杂的方案order-service:build:./order# 在代码里可以通过 http://user-service:8000 访问生产环境我推荐两种方案如果用K8s内置的DNS服务发现足够好用如果是虚拟机部署配合ConsulConsul-Template自动更新Nginx配置。别手写IP地址到配置文件里那是十年前的做法。调试微服务的私人工具箱链路追踪OpenTelemetryJaeger组合。在FastAPI里加几行代码就能接入fromopentelemetry.instrumentation.fastapiimportFastAPIInstrumentor FastAPIInstrumentor.instrument_app(app)健康检查端点每个服务都要有app.get(/health)asyncdefhealth_check():# 检查数据库连接、下游依赖状态db_okawaitcheck_database()redis_okawaitcheck_redis()return{db:db_ok,redis:redis_ok}ifall([db_ok,redis_ok])elseHTTPException(503)结构化日志用JSON格式输出日志方便ELK收集importstructlog loggerstructlog.get_logger()asyncdefsome_function():logger.info(user_action,user_iduser_id,actionlogin,iprequest.client.host)# 输出: {event: user_action, user_id: 123, action: login, ip: 192.168.1.1}写给准备拆微服务的你微服务架构像乐高拆得越细组装越复杂。我的经验是先按业务能力边界拆分而不是按技术层次。用户管理、订单处理、库存管理这些天然就是独立的业务单元。刚开始别急着上各种复杂工具。先用HTTP通信配合简单的服务发现甚至初期可以用硬编码配置。等真正遇到性能瓶颈或部署瓶颈时再逐步引入gRPC、消息队列。监控一定要从第一天就建设。微服务的问题往往不是“某个服务挂了”而是“某个服务变慢了”引发的连锁反应。APM工具的钱不要省它帮你省下的加班费远超过license费用。最后记住微服务的终极目标不是技术炫技而是让团队能独立、快速、安全地交付价值。如果架构变得只有少数几个人能维护那一定是哪里出了问题。保持简单适时重构这才是工程的艺术。

更多文章