Blog

Blog

PHODAL

自动增量计算:构建高性能数据分析系统的任务编排

在起始的那篇《金融 Python 即服务:业务自助的数据服务模式》,我们介绍了:使用 Python 如何使用作为数据系统的 wrapper 层?在这一篇文章里,我们将继续之前的话题,介绍如何使用 Python 作为计算引擎核心的胶水层,即:如何使用 Python 构建 DAG(有向无环图,Directed Acyclic Graph) 任务?

除此,还可以了解一下,如何设计增量 DAG 计算?先看一下增量计算的概念:

增量计算(Incremental computing),是一种软件功能,每当一条数据发生更改时,它都会尝试通过仅重新计算依赖于更改数据的输出来节省时间。

常见的领域有:

  • GUI 应用, 诸如于 React 的 Dom Diff
  • 不断变化的大型计算,诸如于金融计算、电子表格、大数据系统
  • 构建系统,诸如于 Gradle、Bazel、Rustc 等

所以,在开始之前,让我们先看一个简单的例子,Excel 如何实现增量计算。

引子 1:Excel 的增量计算

众所周知,Excel 是使用最广泛的数据分析工具。当我们使用了 Excel 中的公式之后,当我们修改了 A 单元格的值,对应的结果会自动发生变化。而如果在这时,还有其它依赖于此单元格的值时,对应的结果也会发生变化。如下图所示:

出自 《How to Recalculate a Spreadsheet》

在 Microsoft 官方的文档里(Excel 重新计算),可以看到对应的触发重新计算场景:输入新数据、删除或插入行或列等等。在 Excel 中,工作表的计算可视为包含三个阶段的过程:

  1. 构造依赖关系树
  2. 构造计算链
  3. 重新计算单元格

一旦触发了重新计算,Excel 会重新构造依赖关系树和计算链,并依赖于此的所有单元格标记为 ”脏单元格“。随后,根据计算链指定的顺序重新计算。通常来说,在我们设计依赖分析时,假定的是函数是不可变的。但是呢,还存在一些特殊的函数类型,诸如于文档中提到的:

  • 异步函数 (UDF)。
  • 可变函数。即哪怕参数没有变化时,值也可能修改。诸如于 Now、Today 等。

这意味着,我们在设计增量计算时,需要考虑到这个场景的问题。从原理和实现来说,它一点并不算太复杂,有诸如于

从注解 DAG 到增量 DAG 设计

DAG (有向无环图,Directed Acyclic Graph)是一种常用数据结构,仅就 DAG 而言,它已经在我们日常的各种工具中存在:

  • 依赖系统。诸如如 NPM、Yarn、Gradle、Cargo 等
  • 人工智能。如机器学习等
  • 数据流系统。如编译器、Apache Spark、Apache Airflow 等。
  • 数据可视化。常用的 Graphviz,又或者是各个语言里的 Network 相关的库,诸如于 Python 的 NetworkX

当我们从任务编排和数据等的角度来看,DAG 的面向普通人术语是叫工作流(Workflow)。

常规 DAG 到函数式 DAG

通常情况下,实现一个 DAG 非常的简单 —— 只是数据结构。在使用时,也比较简单,如下是 Cytoscape 的 API 示例:

cy.add([
  { group: 'nodes', data: { id: 'n0' }, position: { x: 100, y: 100 } },
  { group: 'nodes', data: { id: 'n1' }, position: { x: 200, y: 200 } },
  { group: 'edges', data: { id: 'e0', source: 'n0', target: 'n1' } }
]);

而这一类 DAG 是静态的,当我们需要结合些任务时,就会需要添加函数。由此便会稍微复杂一些,再现看个示例:

comp = Computation()
comp.add_node('a')
comp.add_node('b', lambda a: a+1)
comp.add_node('c', lambda a, b: 2*a)
comp.add_node('d', lambda b, c: b + c)
comp.add_node('e', lambda c: c + 1)
comp.compute('d')
comp.get_value_dict()

上述的代码中,是 Loman 框架的示例,其中的 lambda a: a+1 是 Python 的 Lambda 表达式。Loman 会在运行时,分析这个 Lambda,获得 Lambda 中的参数,随后添加对应的计算依赖。

Loman 示例

而在多数场景之下,往往是采用注解的形式,诸如于 Airflow、Gradle 等。

基于注解与条件的 DAG 函数

回到研究的开始,如美银证券的 Quartz 的 DSL 扩展(Little languages),便是在 Loman 的形式上进行了一步扩展。使用注解代替了 Lambda:

class C:
  @dag
  def f1(self, x, y):
    return self.f2(x) + y
  @dag
  def f2(self, x):
    return x * x

围绕于这个注解,Quartz 在这一层的实现上,包含了四个特性: DAG、记忆化(memoization)、持久化、时间旅行调试(time travel)。考虑到 Quartz 并不是一个开源的实现,社区上的材料不一定靠谱,所以我们还是再看看 Apache Ariflow 的实现。引用官网的示例:

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator

# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag:

    # Tasks are represented as operators
    hello = BashOperator(task_id="hello", bash_command="echo hello")

    @task()
    def airflow():
        print("airflow")

    # Set dependencies between tasks
    hello >> airflow()

从实现上来说,Apache Airflow 的 DAG 实现本着 “工作流即代码” 的思想设计的。上面代码中,比较有意思的是 >> 语法,其是在任务之间定义了一个依赖关系并控制任务的执行顺序。

增量 DAG 注解:Gradle —— 监听输入与输出

在编译上,Gradle 也是支持增量编译(也是一种增量计算)的,我们可以先看个简单的示例:

abstract class IncrementalReverseTask extends DefaultTask {
    @Incremental
    @InputDirectory
    abstract DirectoryProperty getInputDir()

    @OutputDirectory
    abstract DirectoryProperty getOutputDir()

    @TaskAction
    void execute(InputChanges inputChanges) {
        inputChanges.getFileChanges(inputDir).each { change ->
            if (change.fileType == FileType.DIRECTORY) return

            def targetFile = outputDir.file(change.normalizedPath).get().asFile
            if (change.changeType == ChangeType.REMOVED) {
                targetFile.delete()
            } else {
                targetFile.text = change.file.text.reverse()
            }
        }
    }
}

对于 Gradle 的增量任务来说,通常只需要关注输入和输出,只要 InputDirectoryOutputDirectory 不变,那么就认为 Task 不需要再执行。因为在实现处理逻辑时,只关注于这两个值是否发生变化。

Rust 语言:Salsa 框架的增量 DAG 设计

Rust 编译器的文档上也包含了相关的介绍:Incremental compilation,而这里我们是一个相关的实现 —— 增量编译的设计者之一(Niko Matsakis)编写的库 Salsa。Salsa 是一个用于编写增量 (incremental) 、按需 (on-demand) 程序的 Rust 框架,其采用的是 “红-绿”算法。与 Gradle 相似的,Salsa 结构体(Structs)是使用一种 Salsa 属性宏进行了标注的结构体:

  • #[salsa::input]:用于指定计算的“基本输入”
  • #[salsa::tracked]:用于指定在计算过程中创建的中间值
  • #[salsa::interned]:用于指定易于进行相等比较的小型值

由于 Salsa 相比于 Gradle 是位于更底层的基础设施,所以需要手动构建存储层,即 Jar 和数据库)。数据库是一个结构体,它最终存储 Salsa 的所有中间状态,例如来自跟踪函数的被记忆的 (memoized) 返回值。数据库本身是以一些中间结构 (intermediate structure) 的形式定义的,这些中间结构被称为 jars,并包含每个函数的数据。

缓存计算与存储计算

既然,我们已经通过注解将输入、输出、函数等内容标注出来,下一步就是缓存结果。如此一来,我们就可以通过缓存来提升计算性能。对于计算的缓存来说,至少需要包含这三个部分:

  • 函数表达式(Fn 类型)。
  • 零个或多个参数。
  • 一个可选名称。

由此,我们才能获得缓存后的结果。在一些框架的设计里,诸如于 Python 语言

内存:Memoization —— 函数式编程的记忆

Memoization(记忆化)是函数式语言的一种特性,使用一组参数初次调用函数时,缓存参数和计算结果,当再次使用相同的参数调用该函数时,直接返回相应的缓存结果。在一些不支持 memoization 的语言里,需要手动引入这种设计,如 Java:

Map<Integer, Integer> cache = new ConcurrentHashMap<>();

Integer addOne(Integer x) {
    return cache.computeIfAbsent(x -> x + 1);
}

上述只是一个加法的示例,万能的 StackOverflow 上有更多的示例:Java memoization method

当然了,缓存是有负作用的 —— 第一次计算时存储结果会花费一定的时间,不过大部分情况下可以忽略不计。

数据库存储

对于耗时更长的 AI 或者是金融计算场景时,需要采用分布式的任务调度器,才能更快的得到计算结果。于是乎,采用分布式键值存储来对结果进行缓存就是更好的选择。在 Salsa 框架里,由于考虑到不同的类型(input、output、tracked 等),对于数据结构函数等来说,其对应的 Index 由三部分组成:

#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct DatabaseKeyIndex {
    group_index: u16,
    query_index: u16,
    key_index: u32,
}

大抵是

增量计算框架与算法

由于时间与精力限制(主要是我看不懂一些用英语写的公式,还有暂时没打算学 OCaml),这里就没有展开对于各类计算框架论文的讨论。诸如于 IncrementalAdapton 就是相关的论文与实现,就包含了非常不错的资料。

除此:https://lord.io/spreadsheets/ 一文也给了非常好的介绍。

这里,我就不展开了。

有了增量计算,然后呢?

后续的计算部分,可以参考 Apache Airflow 来实现。它是一个支持开源分布式任务调度框架,其架构

  • 调度程序,它处理触发计划的工作流,并将任务提交给执行程序以运行。
  • 执行器,它处理正在运行的任务。 在默认的 Airflow 安装中,这会在调度程序中运行所有内容,但大多数适合生产的执行程序实际上会将任务执行推送给工作人员。
  • Web 服务器,它提供了一个方便的用户界面来检查、触发和调试 DAG 和任务的行为。
  • DAG 文件的文件夹,由调度程序和执行程序(以及执行程序拥有的任何工作人员)读取
  • 元数据数据库,由调度程序、执行程序和网络服务器用来存储状态。

其架构图如下:

Apache Airflow 架构

不过、过了、还是不过,考虑到 Airflow 的 DAG 实现是 Python,在分布式任务调度并不是那么流行。但是,作为一个参考还是非常不错的。

其他

相关参考资料:

除此,在构建工具方面,在这一方面微软研究院的《Build Systems à la Carte》提供了一个非常不错的介绍,如果你可以参考这一篇《【工业聚看论文】第一期:《Build Systems à la Carte: Theory and Practice》

关于我

Github: @phodal     微博:@phodal     知乎:@phodal    

微信公众号(Phodal)

围观我的Github Idea墙, 也许,你会遇到心仪的项目

QQ技术交流群: 321689806
comment

Feeds

RSS / Atom

最近文章

关于作者

Phodal Huang

Engineer, Consultant, Writer, Designer

ThoughtWorks 技术专家

工程师 / 咨询师 / 作家 / 设计学徒

开源深度爱好者

出版有《前端架构:从入门到微前端》、《自己动手设计物联网》、《全栈应用开发:精益实践》

联系我: h@phodal.com

微信公众号: 最新技术分享

标签