Workflow库文件学习
项目资源链接
- Workflow 官方项目:workflow
- 个人 Workflow 项目 (Gateway):workflow-gateway
根基类 (Kernel Core)
SubTask- 所有任务的基类。定义了
dispatch()(开始执行) 和done()(完成回调) 接口。
- 所有任务的基类。定义了
五大派生类 (Kernel Request Layer)
Workflow 在内核层将任务分为了 5 种基本行为模式:
CommRequest(通信任务)用于网络通信。
派生类:
WFNetworkTask<REQ, RESP>(所有网络任务的基类,如 HTTP, Redis, MySQL 等任务)。
ExecRequest(计算任务)用于线程池计算。
派生类:
WFThreadTask<INPUT, OUTPUT>: 通用计算任务。WFGoTask: 类似 Go 语言协程的简单计算任务。WFSortTask,WFMergeTask等: 在WFAlgoTaskFactory.h中定义,它们本质上是WFThreadTask的别名 (typedef)。
IORequest(文件IO任务)用于异步文件操作(Linux AIO)。
派生类:
WFFileTask<ARGS>(文件读写任务)。
SleepRequest(休眠任务)用于定时器。
派生类:
WFTimerTask(定时器任务)。
ParallelTask(并行任务)用于管理子任务的并行执行。
派生类:
ParallelWork:Workflow::create_parallel_work创建的对象。WFModuleTask: 用于封装复杂的模块化任务流。
通用任务 (Generic Tasks)
为了实现复杂的控制流(如计数、条件选择、图调度),工厂层引入了 WFGenericTask。
WFGenericTask(直接继承自SubTask)这是很多逻辑控制任务的基类。
派生类:
WFCounterTask: 计数器任务,用于等待 N 个事件。WFGraphNode: 图任务的节点,继承自WFCounterTask,利用计数器机制实现依赖等待。
WFMailboxTask: 邮箱任务,用于跨线程传递数据。WFSelectorTask: 选择器任务,从多个消息中选择一个。WFConditional: 条件任务,根据条件动态修改任务流。WFRepeaterTask: 重复任务,用于递归或循环生成任务。WFGraphTask: 图任务容器,管理一张 DAG 图。
WaitGroup (等待组)
这是最常用的工具,类似于 Go 语言的 sync.WaitGroup。
功能:用于等待一组异步任务完成。它维护一个内部计数器。
核心接口:
WaitGroup(int n):构造函数,初始化计数器为n(表示需要等待n个任务)。done():计数器减 1。通常在 Task 的callback中调用。wait():阻塞当前线程,直到计数器变为 0。
典型场景:
主线程发起 10 个并行抓取任务,然后调用
wait()阻塞。每个任务的回调函数里调用
done()。当所有任务完成后,主线程解除阻塞,继续向下执行(如程序退出)。
CountDownLatch (倒计时门闩)
与 WaitGroup 非常相似,但在语义上更强调“门闩”或“开关”的作用。
功能:让一个或多个线程等待,直到其他线程完成了一组操作(计数归零)。
核心接口:
count_down():计数减 1。wait():等待计数归零。
区别:
- 虽然功能上和
WaitGroup重叠,但CountDownLatch通常用于 “多等一” 或 “一等多” 的一次性触发场景。例如,多个工作线程启动后调用wait()等待主线程的一个“开始”信号(计数为1,主线程count_down后所有工作线程同时开跑)。
- 虽然功能上和
超时控制:
wait()接口通常支持传入超时时间(如wait(timeout)),如果在指定时间内计数未归零,则返回超时状态。这对于防止主线程永久死锁非常重要。
Conditional (条件变量封装)
这是一个用于更复杂的 线程间通知 和 条件同步 的工具。
功能:类似于 C++ 标准库的
std::condition_variable,但封装得更易用,通常结合了mutex。用于在一个线程中等待某个特定条件满足(由另一个线程通知)。核心接口:
signal():发送信号,唤醒一个正在等待的线程。broadcast()(可能包含):唤醒所有等待线程。wait():阻塞等待,直到收到信号。
典型场景:
生产者-消费者模型:消费者线程调用
wait()等待数据;生产者线程生成数据后调用signal()通知消费者。条件写入:您提到的“写同步等待”可能指等待缓冲区有空间写入,或者等待某个配置加载完成才允许写入。
核心计算调度
src/kernel/Executor.cc + Executor.h
目标: 彻底搞懂线程池模型及 CPU 密集型任务与网络 IO 的解耦。
源码解析
线程池模型 (
thrdpool):Workflow 底层使用了一个 C 语言实现的线程池
thrdpool(见Executor::init)。核心机制:它不同于简单的“一个任务一个线程”。
Executor维护了一个ExecQueue链表。
ExecQueue与ExecSession:ExecQueue是一个锁保护的链表,用于存放待执行的ExecSession(即任务)。request方法将任务加入队列,并调用thrdpool_schedule将executor_thread_routine调度到线程池中运行。关键点:
Executor本身不执行业务逻辑,它只是将“执行任务”这个动作调度给底层的thrdpool。
解决生产问题:为什么 CPU 密集任务不会卡网络?
这是 Workflow 双引擎架构的核心:
网络引擎 (
Communicator):拥有独立的线程组 (Poller Threads),只负责处理epoll事件(读写 socket、处理连接)。计算引擎 (
Executor):拥有独立的线程组 (Compute Threads)。解耦:当你发起一个
WFGoTask或算法任务时,该任务被封装为ExecRequest扔给了Executor。此时,网络线程立即释放去处理下一个 socket 事件,而计算任务在计算线程池中排队运行。- 结论:即使计算线程 100% 满载,网络线程依然能以微秒级响应 socket 事件(如心跳、握手),保证服务不死。
网络核心
src/kernel/Communicator.cc
目标: 理解网络 IO 全流程,掌握自定义协议的基础。
源码解析
Workflow 的网络心脏,管理着 socket 的生命周期。
request流程:查找连接:调用
request_idle_conn尝试复用空闲连接。创建连接:如果无空闲,调用
request_new_conn->launch_conn创建非阻塞 socket 并发起 connect。注册事件:将 fd 注册到
mpoller(epoll 的封装),等待PD_OP_CONNECT或PD_OP_WRITE事件。
handle_read_result(IO 处理):当 epoll 返回读事件,
Communicator会不断从 socket 读取数据,并调用protocol::append(在msgqueue获取的消息中定义) 来解析协议。状态机:维护
CONN_STATE_IDLE,CONN_STATE_RECEIVING,CONN_STATE_SUCCESS等状态,确保协议解析的原子性。
IOService (磁盘 IO):
- 代码中包含
handle_aio_result。在 Linux 下,它对接内核 AIO (io_submit);在其他平台,它使用线程池模拟 AIO。这使得磁盘 IO 也变成了异步事件,统一由Communicator调度。
- 代码中包含
解决生产问题:写自定义协议与代理
MySQL/Redis 代理:通过阅读
handle_incoming_request和handle_reply_result,你可以理解数据是如何被“切分”成一条条完整的消息的。实现 RPC:只需继承
ProtocolMessage实现encode/decode,然后让Communicator负责搬运字节流。你不需要管 epoll 的细节,只需关注协议解析。
高性能通信桥梁
:src/kernel/msgqueue.h (.c)
目标: 理解 Poller 线程与 Handler 线程的高效通信(双缓冲队列)。
源码解析
这是 Workflow 性能极其强悍的秘密武器之一。
双向/双缓冲队列:
结构体
__msgqueue包含put_head(生产端) 和get_head(消费端)。核心逻辑 (
msgqueue_get):消费者只从get_head取。如果get_head为空,则交换get_head和put_head指针 (需加锁),将生产堆积的数据一次性“倒”过来。
其核心工作流程,也就是你提到的msgqueue_get函数,如下所示:- 常态(无锁操作):在理想情况下,生产者持续向
put_head队列添加数据,消费者则从get_head队列取出数据。由于两个线程操作的是不同的队列,因此完全不需要加锁,实现了极高的并发性 - 触发交换(临界点):当消费者将
get_head队列中的数据全部取完时,就需要进行交换操作。此时,消费者线程会尝试获取一把锁 - 执行交换(加锁同步):在锁的保护下,消费者检查
put_head队列是否为空。- 如果
put_head非空:这意味着生产者已经积累了一批新数据。消费者通过一个极其快速的操作——直接交换put_head和get_head两个指针的值——将满载数据的put_head队列“变成”新的get_head,同时将空的get_head队列“变成”新的put_head。这个过程只是指针赋值,速度极快,锁持有的时间非常短。 - 如果
put_head也为空:则没有数据可消费,消费者根据设计可能等待或返回空值。
- 如果
- 恢复常态:交换完成后,消费者立即释放锁。系统又回到了初始状态:生产者向新的(通常是空的)
put_head写入,消费者从新的(满载数据的)get_head读取,再次进入无锁并行模式。
- 常态(无锁操作):在理想情况下,生产者持续向
优势:在非空且不需要交换指针时,消费者完全无锁。只有在队列空时才发生锁竞争。
⚠️ 潜在局限与考量
当然,这种设计也非万能,需要根据具体需求权衡:- 交换时刻的延迟峰值:虽然交换操作很快,但如果生产者在此期间积累了海量数据,消费者在交换后需要处理一整批数据,可能会导致单次处理时间变长,产生延迟毛刺。
- 内存占用:本质上是一种“空间换时间”的策略,需要维护两个缓冲区
- 复杂性增加:相比简单的互斥锁保护的整体队列,其实现逻辑更复杂,需要仔细处理边界条件(如交换时的状态判断)。
- 不适合多生产者:如果存在多个生产者线程同时向
put_head写入,那么在写入时仍然需要对put_head加锁,这会削弱其无锁写入的优势。通常需要配合其他无锁编程技术或限制为单一生产者。
解决生产问题:手写高性能 RPC
- SRPC (Sogou RPC) 正是基于此机制。在 RPC 场景下,网络线程(生产者)疯狂收包,业务线程(消费者)疯狂处理。双缓冲队列最小化了这两大类线程之间的锁冲突,使得 QPS 可以随 CPU 核数线性增长。
负载均衡与熔断
:src/manager/UpstreamManager.cc
目标: 实现比 Nginx 更灵活的进程内负载均衡。
源码解析
策略管理:
__UpstreamManager是单例,管理所有UPSGroupPolicy。核心接口:
upstream_create_round_robin(轮询)upstream_create_weighted_random(加权随机)upstream_create_consistent_hash(一致性哈希)
动态调整: 提供了
upstream_add_server,upstream_disable_server等接口。这意味着你可以在程序运行时,动态地从配置中心(如 Etcd/Zookeeper)拉取配置并修改路由表,无需重启。
解决生产问题:主备容灾与熔断
熔断:虽然熔断逻辑主要在 Router 和 Policy 内部执行,但
UpstreamManager提供了手动介入的能力。主备切换:通过
try_another参数。如果在UPSRoundRobinPolicy中设置try_another=true,当选中的节点连接失败或超时,框架会自动尝试组内的下一个节点。
路由与治理
:src/manager/RouteManager.cc
目标: 深入理解路由选择、DNS 缓存与连接池分组。
源码解析
缓存结构 (
RouteResultEntry):使用红黑树 (
rbtree) 缓存路由结果。Key 由(TransportType, Host, Port, Params)组成。这意味着:即使是同一个域名,如果 SSL 参数不同或超时设置不同,也会被隔离在不同的连接池中。
断路器 (
check_breaker):每个路由入口维护一个
breaker_list(熔断列表)。notify_unavailable: 当某个 Target 连续失败,将其移入熔断列表。MTTR_SECOND(默认 30秒): 熔断恢复时间。30秒后会尝试重新将其加入可用组。
解决生产问题:服务发现与重试
自定义服务发现:你可以实现自己的
NamingPolicy注册到系统。当 DNS 解析或服务发现返回 IP 列表时,RouteManager会根据UpstreamManager设定的策略(如一致性哈希)从中选择一个目标。连接复用:理解了
RouteManager就知道,只要请求的key不变,Workflow 就会自动复用连接(Keep-Alive)。
HTTP 服务实现
:src/server/WFHttpServer.h (+ WFServer.cc)
目标: 写出高性能 HTTP 服务。
源码解析
继承体系:
WFHttpServer继承自WFServer<HttpRequest, HttpResponse>。WFServer继承自WFServerBase,后者管理listen_fd和scheduler(即 Communicator)。
new_session(核心工厂方法):当有新连接上的数据到达时,
WFServer::new_session被调用。它创建一个
WFHttpTask,并将用户传入的process回调函数绑定到这个 Task 上。
启动流程:
start()->Notesen_fd->scheduler->bind()。将监听端口绑定到Communicator的mpoller上。
解决生产问题:单机 20w+ QPS
全异步处理:在
process回调中,不要做阻塞操作(如sleep或同步 MySQL 查询)。应该生成新的WFMySQLTask或WFHttpTask,利用series串联。内存零拷贝:
HttpMessage的解析使用了零拷贝技术。连接管理:
WFServer自动处理 Keep-Alive 和连接超时 (params.keep_alive_timeout),在高并发下快速回收死链,复用活链。
图任务引擎
:workflow/src/factory/WFGraphTask.cc
核心功能:实现有向无环图 (DAG) 的任务调度。
源码解析
节点机制 (
WFGraphNode):继承关系:虽然
.cc文件没有显示头文件,但从node->WFCounterTask::count()可以看出,WFGraphNode继承自WFCounterTask。触发机制:神奇之处在于
WFGraphNode::~WFGraphNode()析构函数。当一个节点执行完毕(done())并被销毁时,它会遍历this->successors(后继节点列表),并对每个后继节点调用count()。依赖管理:
WFCounterTask的特性是:内部计数器减到 0 时自动触发执行。因此,如果节点 B 依赖节点 A,B 的初始计数器为 1。A 完成 -> A 析构 -> B.count() -> B 计数器归零 -> B 执行。
图任务容器 (
WFGraphTask):它维护了一个
ParallelWork指针 (this->parallel)。create_graph_node:每创建一个节点,其实是创建了一个SeriesWork(包含该节点),并将其加入到ParallelWork中。Dispatch:图任务启动时,实际上是启动内部那个包含所有节点的
ParallelWork。所有入度为 0 的节点(计数器为 0)会立即开始运行。
生产价值:推荐系统与朋友圈
推荐系统:在一个请求中,你需要并发调用“召回”、“粗排”、“精排”等服务。精排依赖粗排,粗排依赖召回。使用
WFGraphTask,你可以构建一个静态的图结构,Workflow 会自动处理这种复杂的依赖并行,最大化利用 CPU。朋友圈/Feed流:并发拉取用户信息、图片信息、评论信息,最后聚合。
命名服务与服务发现
workflow/src/nameservice/WFNameService.cc`
核心功能:提供通用的服务命名与路由策略管理。
源码解析
红黑树存储 (
rbtree):WFNameService内部维护了一颗红黑树 (this->root) 来存储策略 (WFNSPolicy),Key 是服务名字符串。使用
pthread_rwlock读写锁保护,保证高并发下的查询性能。
策略模式 (
WFNSPolicy):add_policy(name, policy):将一个字符串名字(如 “redis_service”)绑定到一个策略对象上。解耦:当客户端发起请求时,只需要提供名字。
WFNameService根据名字找到策略,策略再负责返回具体的Address(IP+Port)。
生产价值:自定义服务发现 (Consul/Etcd/K8s)
实现机制:Workflow 默认支持 DNS 和本地 hosts。但如果你用 Consul 或 Nacos,你可以实现一个自定义的
WFNSPolicy。动态路由:在这个策略类里,你可以定时从 Consul 拉取最新的服务 IP 列表。当 Workflow 里的 Task 需要连接 “MyService” 时,你的策略会返回最新的健康节点 IP。这让 Workflow 能够无缝融入微服务架构。
DNS 客户端
:workflow/src/client/WFDnsClient.cc
核心功能:高性能、可配置的异步 DNS 解析。
源码解析
复杂参数控制 (
DnsParams):- 支持
/etc/resolv.conf中的高级配置,如search_list(搜索域)、ndots(点数阈值)、attempts(重试次数)、rotate(轮询 DNS 服务器)。
- 支持
状态机逻辑 (
__callback_internal):这是一个非常典型的异步状态机。当 DNS 请求返回(
done)时,检查rcode。故障转移:如果
SERVER_FAILURE,状态机逻辑会递增next_server,修改ctask->set_redirect尝试下一个 DNS 服务器。搜索域拼接:如果
NAME_ERROR(域名不存在),且配置了search_list,它会自动拼接后缀(如.svc.cluster.local)再次发起查询。
生产价值:自定义 DNS 负载均衡
高可用:在生产环境中,DNS 抖动是常见问题。Workflow 的 DNS Client 内置了完善的重试和轮询机制,确保 DNS 解析的高可用性。
K8s 适配:对
ndotsandsearch的支持使得它能完美运行在 Kubernetes 环境中,正确解析 Service 短域名。
URI 解析核心
:workflow/src/util/URIParser.cc
核心功能:快速解析 URL 各个部分(Scheme, Host, Port, Path, Query 等)。
源码解析
查表法优化 (
Table-Driven):代码中定义了巨大的
valid_char和authority_map数组。零分支预测:通过
authority_map[(unsigned char)str[i]]直接判断字符类型,避免了大量的if-else或switch-case跳转。这是高性能 Parser 的标准写法。
状态机解析 (
parse):解析器维护了
pre_state和start_idx/end_idx。它不进行内存拷贝来生成中间字符串,而是记录各个部分的偏移量 (
start_idx,end_idx)。只有在需要输出结果时(如ParsedURI构造),才进行必要的strdup或realloc。
生产价值:反向代理 (Reverse Proxy)
网关基石:写 API 网关或反向代理(如 Nginx 替代品)时,必须解析进来的 URL 才能决定转发给哪个后端服务。
性能关键:每个请求都要跑一遍这个逻辑,查表法带来的 CPU 节省在 10万+ QPS 下非常可观。
HTTP 协议解析
:workflow/src/protocol/http_parser.c
核心功能:极速解析 HTTP 请求和响应报文。
源码解析
状态机设计:
定义了
HPS_START_LINE,HPS_HEADER_NAME,HPS_HEADER_VALUE等状态。这是一个流式解析器 (
append_message)。它不需要收到完整包才开始解,而是来一段 buffer 解析一段。这对于处理大文件上传或慢网络连接至关重要。
增量解析 (
http_parser_append_message):- 它维护一个
msgbuf。当新数据到达,直接memcpy到 buffer 末尾,然后从上次的header_offset继续解析。
- 它维护一个
Chunked 编码支持:
- 内置了
__parse_chunk函数。对于 HTTP/1.1 的 Chunked 传输,它能自动识别 Chunk Size 并提取数据,对上层透明。
- 内置了
生产价值:为什么比 libevent 快?
C 语言极致优化:完全基于指针操作和内存偏移,几乎没有多余的对象创建开销。
零拷贝思想:在解析过程中,它尽量复用接收到的 buffer。Header 的解析使用了链表
__header_line,但 value 的存储往往只是指针指向 buffer 中的位置(如果空间足够),避免了大量的字符串拷贝。紧凑内存:
http_parser_t结构体非常紧凑,且msgbuf支持动态扩容,既节省内存又能处理超大包。