项目资源链接


根基类 (Kernel Core)

  • SubTask

    • 所有任务的基类。定义了 dispatch() (开始执行) 和 done() (完成回调) 接口。

五大派生类 (Kernel Request Layer)

Workflow 在内核层将任务分为了 5 种基本行为模式:

  1. CommRequest (通信任务)

    • 用于网络通信。

    • 派生类: WFNetworkTask<REQ, RESP> (所有网络任务的基类,如 HTTP, Redis, MySQL 等任务)。

  2. ExecRequest (计算任务)

    • 用于线程池计算。

    • 派生类:

      • WFThreadTask<INPUT, OUTPUT>: 通用计算任务。

      • WFGoTask: 类似 Go 语言协程的简单计算任务。

      • WFSortTask, WFMergeTask 等: 在 WFAlgoTaskFactory.h 中定义,它们本质上是 WFThreadTask 的别名 (typedef)。

  3. IORequest (文件IO任务)

    • 用于异步文件操作(Linux AIO)。

    • 派生类: WFFileTask<ARGS> (文件读写任务)。

  4. SleepRequest (休眠任务)

    • 用于定时器。

    • 派生类: WFTimerTask (定时器任务)。

  5. ParallelTask (并行任务)

    • 用于管理子任务的并行执行。

    • 派生类:

      • ParallelWork: Workflow::create_parallel_work 创建的对象。

      • WFModuleTask: 用于封装复杂的模块化任务流。

通用任务 (Generic Tasks)

为了实现复杂的控制流(如计数、条件选择、图调度),工厂层引入了 WFGenericTask

  • WFGenericTask (直接继承自 SubTask)

    • 这是很多逻辑控制任务的基类。

    • 派生类:

      • WFCounterTask: 计数器任务,用于等待 N 个事件。

        • WFGraphNode: 图任务的节点,继承自 WFCounterTask,利用计数器机制实现依赖等待。
      • WFMailboxTask: 邮箱任务,用于跨线程传递数据。

      • WFSelectorTask: 选择器任务,从多个消息中选择一个。

      • WFConditional: 条件任务,根据条件动态修改任务流。

      • WFRepeaterTask: 重复任务,用于递归或循环生成任务。

      • WFGraphTask: 图任务容器,管理一张 DAG 图。

WaitGroup (等待组)

这是最常用的工具,类似于 Go 语言的 sync.WaitGroup

  • 功能:用于等待一组异步任务完成。它维护一个内部计数器。

  • 核心接口

    • WaitGroup(int n):构造函数,初始化计数器为 n(表示需要等待 n 个任务)。

    • done():计数器减 1。通常在 Task 的 callback 中调用。

    • wait():阻塞当前线程,直到计数器变为 0。

  • 典型场景

    • 主线程发起 10 个并行抓取任务,然后调用 wait() 阻塞。

    • 每个任务的回调函数里调用 done()

    • 当所有任务完成后,主线程解除阻塞,继续向下执行(如程序退出)。

CountDownLatch (倒计时门闩)

WaitGroup 非常相似,但在语义上更强调“门闩”或“开关”的作用。

  • 功能:让一个或多个线程等待,直到其他线程完成了一组操作(计数归零)。

  • 核心接口

    • count_down():计数减 1。

    • wait():等待计数归零。

  • 区别

    • 虽然功能上和 WaitGroup 重叠,但 CountDownLatch 通常用于 “多等一”“一等多” 的一次性触发场景。例如,多个工作线程启动后调用 wait() 等待主线程的一个“开始”信号(计数为1,主线程 count_down 后所有工作线程同时开跑)。
  • 超时控制

    • wait() 接口通常支持传入超时时间(如 wait(timeout)),如果在指定时间内计数未归零,则返回超时状态。这对于防止主线程永久死锁非常重要。

Conditional (条件变量封装)

这是一个用于更复杂的 线程间通知条件同步 的工具。

  • 功能:类似于 C++ 标准库的 std::condition_variable,但封装得更易用,通常结合了 mutex。用于在一个线程中等待某个特定条件满足(由另一个线程通知)。

  • 核心接口

    • signal():发送信号,唤醒一个正在等待的线程。

    • broadcast()(可能包含):唤醒所有等待线程。

    • wait():阻塞等待,直到收到信号。

  • 典型场景

    • 生产者-消费者模型:消费者线程调用 wait() 等待数据;生产者线程生成数据后调用 signal() 通知消费者。

    • 条件写入:您提到的“写同步等待”可能指等待缓冲区有空间写入,或者等待某个配置加载完成才允许写入。

核心计算调度

src/kernel/Executor.cc + Executor.h

目标: 彻底搞懂线程池模型及 CPU 密集型任务与网络 IO 的解耦。

源码解析

  • 线程池模型 (thrdpool):

    • Workflow 底层使用了一个 C 语言实现的线程池 thrdpool (见 Executor::init)。

    • 核心机制:它不同于简单的“一个任务一个线程”。Executor 维护了一个 ExecQueue 链表。

  • ExecQueueExecSession:

    • ExecQueue 是一个锁保护的链表,用于存放待执行的 ExecSession (即任务)。

    • request 方法将任务加入队列,并调用 thrdpool_scheduleexecutor_thread_routine 调度到线程池中运行。

    • 关键点Executor 本身不执行业务逻辑,它只是将“执行任务”这个动作调度给底层的 thrdpool

解决生产问题:为什么 CPU 密集任务不会卡网络?

这是 Workflow 双引擎架构的核心:

  1. 网络引擎 (Communicator):拥有独立的线程组 (Poller Threads),只负责处理 epoll 事件(读写 socket、处理连接)。

  2. 计算引擎 (Executor):拥有独立的线程组 (Compute Threads)。

  3. 解耦:当你发起一个 WFGoTask 或算法任务时,该任务被封装为 ExecRequest 扔给了 Executor。此时,网络线程立即释放去处理下一个 socket 事件,而计算任务在计算线程池中排队运行。

    • 结论:即使计算线程 100% 满载,网络线程依然能以微秒级响应 socket 事件(如心跳、握手),保证服务不死。

网络核心

src/kernel/Communicator.cc

目标: 理解网络 IO 全流程,掌握自定义协议的基础。

源码解析

Workflow 的网络心脏,管理着 socket 的生命周期。

  • request 流程:

    1. 查找连接:调用 request_idle_conn 尝试复用空闲连接。

    2. 创建连接:如果无空闲,调用 request_new_conn -> launch_conn 创建非阻塞 socket 并发起 connect。

    3. 注册事件:将 fd 注册到 mpoller (epoll 的封装),等待 PD_OP_CONNECTPD_OP_WRITE 事件。

  • handle_read_result (IO 处理):

    • 当 epoll 返回读事件,Communicator 会不断从 socket 读取数据,并调用 protocol::append (在 msgqueue 获取的消息中定义) 来解析协议。

    • 状态机:维护 CONN_STATE_IDLE, CONN_STATE_RECEIVING, CONN_STATE_SUCCESS 等状态,确保协议解析的原子性。

  • IOService (磁盘 IO):

    • 代码中包含 handle_aio_result。在 Linux 下,它对接内核 AIO (io_submit);在其他平台,它使用线程池模拟 AIO。这使得磁盘 IO 也变成了异步事件,统一由 Communicator 调度。

解决生产问题:写自定义协议与代理

  • MySQL/Redis 代理:通过阅读 handle_incoming_requesthandle_reply_result,你可以理解数据是如何被“切分”成一条条完整的消息的。

  • 实现 RPC:只需继承 ProtocolMessage 实现 encode/decode,然后让 Communicator 负责搬运字节流。你不需要管 epoll 的细节,只需关注协议解析。


高性能通信桥梁

:src/kernel/msgqueue.h (.c)

目标: 理解 Poller 线程与 Handler 线程的高效通信(双缓冲队列)。

源码解析

这是 Workflow 性能极其强悍的秘密武器之一。

  • 双向/双缓冲队列:

    • 结构体 __msgqueue 包含 put_head (生产端) 和 get_head (消费端)。

    • 核心逻辑 (msgqueue_get):消费者只从 get_head 取。如果 get_head 为空,则交换 get_headput_head 指针 (需加锁),将生产堆积的数据一次性“倒”过来。
      其核心工作流程,也就是你提到的 msgqueue_get函数,如下所示:

      1. 常态(无锁操作):在理想情况下,生产者持续向 put_head队列添加数据,消费者则从 get_head队列取出数据。由于两个线程操作的是不同的队列,因此完全不需要加锁,实现了极高的并发性
      2. 触发交换(临界点):当消费者将 get_head队列中的数据全部取完时,就需要进行交换操作。此时,消费者线程会尝试获取一把锁
      3. 执行交换(加锁同步):在锁的保护下,消费者检查 put_head队列是否为空。
        • 如果 put_head非空:这意味着生产者已经积累了一批新数据。消费者通过一个极其快速的操作——直接交换 put_head和 get_head两个指针的值——将满载数据的 put_head队列“变成”新的 get_head,同时将空的 get_head队列“变成”新的 put_head。这个过程只是指针赋值,速度极快,锁持有的时间非常短。
        • 如果 put_head也为空:则没有数据可消费,消费者根据设计可能等待或返回空值。
      4. 恢复常态:交换完成后,消费者立即释放锁。系统又回到了初始状态:生产者向新的(通常是空的)put_head写入,消费者从新的(满载数据的)get_head读取,再次进入无锁并行模式。
    • 优势:在非空且不需要交换指针时,消费者完全无锁。只有在队列空时才发生锁竞争。
      ⚠️ 潜在局限与考量
      当然,这种设计也非万能,需要根据具体需求权衡:

      • 交换时刻的延迟峰值:虽然交换操作很快,但如果生产者在此期间积累了海量数据,消费者在交换后需要处理一整批数据,可能会导致单次处理时间变长,产生延迟毛刺。
      • 内存占用:本质上是一种“空间换时间”的策略,需要维护两个缓冲区
      • 复杂性增加:相比简单的互斥锁保护的整体队列,其实现逻辑更复杂,需要仔细处理边界条件(如交换时的状态判断)。
      • 不适合多生产者:如果存在多个生产者线程同时向 put_head写入,那么在写入时仍然需要对 put_head加锁,这会削弱其无锁写入的优势。通常需要配合其他无锁编程技术或限制为单一生产者。

解决生产问题:手写高性能 RPC

  • SRPC (Sogou RPC) 正是基于此机制。在 RPC 场景下,网络线程(生产者)疯狂收包,业务线程(消费者)疯狂处理。双缓冲队列最小化了这两大类线程之间的锁冲突,使得 QPS 可以随 CPU 核数线性增长。

负载均衡与熔断

:src/manager/UpstreamManager.cc

目标: 实现比 Nginx 更灵活的进程内负载均衡。

源码解析

  • 策略管理: __UpstreamManager 是单例,管理所有 UPSGroupPolicy

  • 核心接口:

    • upstream_create_round_robin (轮询)

    • upstream_create_weighted_random (加权随机)

    • upstream_create_consistent_hash (一致性哈希)

  • 动态调整: 提供了 upstream_add_server, upstream_disable_server 等接口。这意味着你可以在程序运行时,动态地从配置中心(如 Etcd/Zookeeper)拉取配置并修改路由表,无需重启

解决生产问题:主备容灾与熔断

  • 熔断:虽然熔断逻辑主要在 Router 和 Policy 内部执行,但 UpstreamManager 提供了手动介入的能力。

  • 主备切换:通过 try_another 参数。如果在 UPSRoundRobinPolicy 中设置 try_another=true,当选中的节点连接失败或超时,框架会自动尝试组内的下一个节点。


路由与治理

:src/manager/RouteManager.cc

目标: 深入理解路由选择、DNS 缓存与连接池分组。

源码解析

  • 缓存结构 (RouteResultEntry):

    • 使用红黑树 (rbtree) 缓存路由结果。Key 由 (TransportType, Host, Port, Params) 组成。

    • 这意味着:即使是同一个域名,如果 SSL 参数不同或超时设置不同,也会被隔离在不同的连接池中。

  • 断路器 (check_breaker):

    • 每个路由入口维护一个 breaker_list (熔断列表)。

    • notify_unavailable: 当某个 Target 连续失败,将其移入熔断列表。

    • MTTR_SECOND (默认 30秒): 熔断恢复时间。30秒后会尝试重新将其加入可用组。

解决生产问题:服务发现与重试

  • 自定义服务发现:你可以实现自己的 NamingPolicy 注册到系统。当 DNS 解析或服务发现返回 IP 列表时,RouteManager 会根据 UpstreamManager 设定的策略(如一致性哈希)从中选择一个目标。

  • 连接复用:理解了 RouteManager 就知道,只要请求的 key 不变,Workflow 就会自动复用连接(Keep-Alive)。


HTTP 服务实现

:src/server/WFHttpServer.h (+ WFServer.cc)

目标: 写出高性能 HTTP 服务。

源码解析

  • 继承体系:

    • WFHttpServer 继承自 WFServer<HttpRequest, HttpResponse>

    • WFServer 继承自 WFServerBase,后者管理 listen_fdscheduler (即 Communicator)。

  • new_session (核心工厂方法):

    • 当有新连接上的数据到达时,WFServer::new_session 被调用。

    • 它创建一个 WFHttpTask,并将用户传入的 process 回调函数绑定到这个 Task 上。

  • 启动流程:

    • start() -> Notesen_fd -> scheduler->bind()。将监听端口绑定到 Communicatormpoller 上。

解决生产问题:单机 20w+ QPS

  • 全异步处理:在 process 回调中,不要做阻塞操作(如 sleep 或同步 MySQL 查询)。应该生成新的 WFMySQLTaskWFHttpTask,利用 series 串联。

  • 内存零拷贝HttpMessage 的解析使用了零拷贝技术。

  • 连接管理WFServer 自动处理 Keep-Alive 和连接超时 (params.keep_alive_timeout),在高并发下快速回收死链,复用活链。

图任务引擎

workflow/src/factory/WFGraphTask.cc

核心功能:实现有向无环图 (DAG) 的任务调度。

源码解析

  • 节点机制 (WFGraphNode)

    • 继承关系:虽然 .cc 文件没有显示头文件,但从 node->WFCounterTask::count() 可以看出,WFGraphNode 继承自 WFCounterTask

    • 触发机制:神奇之处在于 WFGraphNode::~WFGraphNode() 析构函数。当一个节点执行完毕(done())并被销毁时,它会遍历 this->successors(后继节点列表),并对每个后继节点调用 count()

    • 依赖管理WFCounterTask 的特性是:内部计数器减到 0 时自动触发执行。因此,如果节点 B 依赖节点 A,B 的初始计数器为 1。A 完成 -> A 析构 -> B.count() -> B 计数器归零 -> B 执行。

  • 图任务容器 (WFGraphTask)

    • 它维护了一个 ParallelWork 指针 (this->parallel)。

    • create_graph_node:每创建一个节点,其实是创建了一个 SeriesWork(包含该节点),并将其加入到 ParallelWork 中。

    • Dispatch:图任务启动时,实际上是启动内部那个包含所有节点的 ParallelWork。所有入度为 0 的节点(计数器为 0)会立即开始运行。

生产价值:推荐系统与朋友圈

  • 推荐系统:在一个请求中,你需要并发调用“召回”、“粗排”、“精排”等服务。精排依赖粗排,粗排依赖召回。使用 WFGraphTask,你可以构建一个静态的图结构,Workflow 会自动处理这种复杂的依赖并行,最大化利用 CPU。

  • 朋友圈/Feed流:并发拉取用户信息、图片信息、评论信息,最后聚合。


命名服务与服务发现

workflow/src/nameservice/WFNameService.cc`

核心功能:提供通用的服务命名与路由策略管理。

源码解析

  • 红黑树存储 (rbtree)

    • WFNameService 内部维护了一颗红黑树 (this->root) 来存储策略 (WFNSPolicy),Key 是服务名字符串。

    • 使用 pthread_rwlock 读写锁保护,保证高并发下的查询性能。

  • 策略模式 (WFNSPolicy)

    • add_policy(name, policy):将一个字符串名字(如 “redis_service”)绑定到一个策略对象上。

    • 解耦:当客户端发起请求时,只需要提供名字。WFNameService 根据名字找到策略,策略再负责返回具体的 Address(IP+Port)。

生产价值:自定义服务发现 (Consul/Etcd/K8s)

  • 实现机制:Workflow 默认支持 DNS 和本地 hosts。但如果你用 Consul 或 Nacos,你可以实现一个自定义的 WFNSPolicy

  • 动态路由:在这个策略类里,你可以定时从 Consul 拉取最新的服务 IP 列表。当 Workflow 里的 Task 需要连接 “MyService” 时,你的策略会返回最新的健康节点 IP。这让 Workflow 能够无缝融入微服务架构。


DNS 客户端

workflow/src/client/WFDnsClient.cc

核心功能:高性能、可配置的异步 DNS 解析。

源码解析

  • 复杂参数控制 (DnsParams)

    • 支持 /etc/resolv.conf 中的高级配置,如 search_list(搜索域)、ndots(点数阈值)、attempts(重试次数)、rotate(轮询 DNS 服务器)。
  • 状态机逻辑 (__callback_internal)

    • 这是一个非常典型的异步状态机。当 DNS 请求返回(done)时,检查 rcode

    • 故障转移:如果 SERVER_FAILURE,状态机逻辑会递增 next_server,修改 ctask->set_redirect 尝试下一个 DNS 服务器。

    • 搜索域拼接:如果 NAME_ERROR(域名不存在),且配置了 search_list,它会自动拼接后缀(如 .svc.cluster.local)再次发起查询。

生产价值:自定义 DNS 负载均衡

  • 高可用:在生产环境中,DNS 抖动是常见问题。Workflow 的 DNS Client 内置了完善的重试和轮询机制,确保 DNS 解析的高可用性。

  • K8s 适配:对 ndots and search 的支持使得它能完美运行在 Kubernetes 环境中,正确解析 Service 短域名。


URI 解析核心

workflow/src/util/URIParser.cc

核心功能:快速解析 URL 各个部分(Scheme, Host, Port, Path, Query 等)。

源码解析

  • 查表法优化 (Table-Driven)

    • 代码中定义了巨大的 valid_charauthority_map 数组。

    • 零分支预测:通过 authority_map[(unsigned char)str[i]] 直接判断字符类型,避免了大量的 if-elseswitch-case 跳转。这是高性能 Parser 的标准写法。

  • 状态机解析 (parse)

    • 解析器维护了 pre_statestart_idx/end_idx

    • 不进行内存拷贝来生成中间字符串,而是记录各个部分的偏移量 (start_idx, end_idx)。只有在需要输出结果时(如 ParsedURI 构造),才进行必要的 strduprealloc

生产价值:反向代理 (Reverse Proxy)

  • 网关基石:写 API 网关或反向代理(如 Nginx 替代品)时,必须解析进来的 URL 才能决定转发给哪个后端服务。

  • 性能关键:每个请求都要跑一遍这个逻辑,查表法带来的 CPU 节省在 10万+ QPS 下非常可观。


HTTP 协议解析

workflow/src/protocol/http_parser.c

核心功能:极速解析 HTTP 请求和响应报文。

源码解析

  • 状态机设计

    • 定义了 HPS_START_LINE, HPS_HEADER_NAME, HPS_HEADER_VALUE 等状态。

    • 这是一个流式解析器 (append_message)。它不需要收到完整包才开始解,而是来一段 buffer 解析一段。这对于处理大文件上传或慢网络连接至关重要。

  • 增量解析 (http_parser_append_message)

    • 它维护一个 msgbuf。当新数据到达,直接 memcpy 到 buffer 末尾,然后从上次的 header_offset 继续解析。
  • Chunked 编码支持

    • 内置了 __parse_chunk 函数。对于 HTTP/1.1 的 Chunked 传输,它能自动识别 Chunk Size 并提取数据,对上层透明。

生产价值:为什么比 libevent 快?

  • C 语言极致优化:完全基于指针操作和内存偏移,几乎没有多余的对象创建开销。

  • 零拷贝思想:在解析过程中,它尽量复用接收到的 buffer。Header 的解析使用了链表 __header_line,但 value 的存储往往只是指针指向 buffer 中的位置(如果空间足够),避免了大量的字符串拷贝。

  • 紧凑内存http_parser_t 结构体非常紧凑,且 msgbuf 支持动态扩容,既节省内存又能处理超大包。