0%

【大数据架构之旅】4 上手实践 dagster

上一讲中,我们介绍了 dagster 的基本概念和特点,这一次我们就要真正开始学习如何使用 dagster 来组织我们的 etl 项目。


更新历史

创建新项目

上次我们使用 dagster project 来新建项目,是默认自带的,相对来说比较简单,这次我们使用官方推荐的模板来创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 如果没有安装,先安装
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple dagster
# 从 example 中新建项目,更多 example,可以访问 https://github.com/dagster-io/dagster/tree/master/examples
dagster project from-example --name dagster-etl-demo --example quickstart_etl

# 安装成功后输出
> Downloading example 'quickstart_etl'. This may take a while.
> Success! Created dagster-etl-demo at /Users/wangda/Documents/Gitee/dagster-etl-demo/.

# 进入项目目录,安装项目依赖
cd dagster-etl-demo
pip install -e ".[dev]"
# 如果网络不给力,加上 -i 参数
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple -e ".[dev]"

# 启动 Dagster UI
dagster dev
# 启动之后,访问 http://localhost:3000 就可以查看项目了

开发之前

如果需要添加新的 python 包,在 setup.py 中修改,文件内容参考如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from setuptools import find_packages, setup

setup(
name="quickstart_etl",
packages=find_packages(exclude=["quickstart_etl_tests"]),
install_requires=[
"dagster",
"dagster-cloud",
"boto3",
"pandas",
"matplotlib",
"textblob",
"tweepy",
"wordcloud",
],
extras_require={"dev": ["dagit", "pytest"]},
)

如果有比较敏感的密钥,不能放在代码里,可以放在环境变量中,如果想要提前了解,可以参考 Using environment variables and secrets。当然,接下来的内容也会包含。

测试是很重要的,我们可以用如下命令运行测试 pytest quickstart_etl_tests(不需要停止之前的 dagster dev

文件夹结构

接下来我们来了解一下具体的文件夹和文件的含义,原文在这里。本文因为篇幅所限,仅列举一些经常需要修改或需要了解的配置:

  • 所有的 assets 都在要么在 assets.py(默认的模板)或在 quickstart_etl/__init__.py 中引入,如果有子文件夹,使用 load_assets_from_package_module 方法来引入,也可以直接使用 load_assets_from_modules 从单个 python 文件中引入
  • 配置文件(可选)有 dagster.yamldagster_cloud.yaml,用来定义存储位置,启动器,sensors 和 schedules
  • 配置文件(可选)还有 workspace.yaml,用来在本地开发总定义不同的代码位置

如果所有的代码在一个位置,那么一个 project 大概长这样:

1
2
3
4
5
6
7
8
9
10
11
.
├── README.md
├── my_dagster_project
│   ├── __init__.py
│   └── assets.py
├── my_dagster_project_tests
├── dagster.yaml ## optional, used for instance settings
├── pyproject.toml ## optional, used to define the project as a module
├── setup.cfg
├── setup.py
└── tox.ini

如果有代码放在不同位置,那么就需要 workspace.yaml 来指定,大概结构长这样:

1
2
3
4
5
6
7
8
9
10
11
12
.
├── README.md
├── my_dagster_project
│   ├── __init__.py
│   └── assets.py
├── my_dagster_project_tests
├── dagster.yaml ## optional, used for instance settings
├── pyproject.toml
├── setup.cfg
├── setup.py
├── tox.ini
└── workspace.yaml ## defines multiple code locations

特别提醒,如果不想发送使用信息,在 dagster.yaml 中添加如下内容禁用

1
2
telemetry:
enabled: false

正式开动

Dagster 是一个调度器,专门用于开发和维护数据资产,例如表格、数据集、机器学习模型和报告。

接下来我们将分析热门新闻聚合网站 Hacker News 上的活动。首先我们从该网站获取数据,对其进行清理,并生成一份报告。然后,我们将设置 Dagster 定时更新数据和报告,Dagster 称之为资产。

软件定义资产 Software-defined assets

资产是指在持久存储中的对象,这些对象代表着一些对这个世界的洞察。资产可以是任何类型的对象,例如:

  • 数据库表或视图
  • 文件,例如在您的本地机器上或类似于 Amazon S3 的 Blob 存储中的文件
  • 机器学习模型

如果您已经拥有现有的数据管道,则很可能已经有了资产。

软件定义的资产是 Dagster 的一个概念,允许我们按照它们生成的资产编写数据管道。它们的工作方式是:我们编写描述您想要存在的资产的代码,以及产生该资产的任何其他资产和可以运行以计算资产内容的函数。

下面是一个例子,我们定义了一个叫做 topstories 的资产,它是怎么来的呢?它是从另外一个名为 topstory_ids 的资产中获取不同的 ID,并找到这些 ID 的数据而组成的,代码如下:

1
2
3
4
5
6
7
8
9
10
11
@asset
def topstories(topstory_ids): # 这里说明 topstories 这个资产,依赖于 topstory_ids 这个资产
results = []
# 这里就是 topstories 这个资产的产生过程
for item_id in topstory_ids:
item = requests.get(
f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
).json()
results.append(item)

return pd.DataFrame(results)

构建资产的 DAG

我们在浏览器中打开 all_asset_job 页面,就会发下如下图所示的工作流,我们称之为 DAG(directed acyclic graph)有向无环图

在这里我们可以非常清晰地看到不同的资产之间的关系,可以大幅降低大家在协作过程中的沟通成本(从面向过程到面向对象的飞跃)。

注:这一节重点介绍的是 asset,后续还会有更多有用的能力,比如 ops 和 jobsUnderstanding how software-defined assets relate to ops and graphs

实际运行

当我们想要执行这个工作流的时候,实际上我们就是要把这些具体的资产进行物化(Materialize),点击上图的黑色 Materialized All 按钮,就会发起一次运行(Run),我们可以在本次运行的查看页面看到具体的进展:

下面的面板中可以看到日志输出,我们可以清楚地看到任务进展,运行完成之后我们可以看到更多的信息,让我们来看看本次生成的词云吧(生成的是 base64 编码后的图像,在 hackernews_topstories_word_cloud 资产中可以看到!

代码解析

Asset 代码

别看上面的步骤复杂,其实就对应一个 Python 文件里的三个函数(具体解析写在代码注释中):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# 通过 @asset 声明是一个资产, -> 用来表示返回的类型
@asset(group_name="hackernews", compute_kind="HackerNews API")
def hackernews_topstory_ids() -> List[int]:
"""Get up to 500 top stories from the HackerNews topstories endpoint.
通过 API 获取 Top 500 的新闻
API Docs: https://github.com/HackerNews/API#new-top-and-best-stories
"""
newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json"
# 访问 url 并拿到 JSON 返回,最后变成一个 List[int] 的数组
top_500_newstories = requests.get(newstories_url).json()
return top_500_newstories


# 这里的参数是 hackernews_topstory_ids,表示依赖该资产,最终得到一个 pd.DataFrame
# context 是一个资产上下文,传递信息和输出日志
@asset(group_name="hackernews", compute_kind="HackerNews API")
def hackernews_topstories(
context: AssetExecutionContext, hackernews_topstory_ids: List[int]
) -> pd.DataFrame:
"""Get items based on story ids from the HackerNews items endpoint. It may take 1-2 minutes to fetch all 500 items.
从 HackerNews 获取具体的数据
API Docs: https://github.com/HackerNews/API#items
"""
results = []
for item_id in hackernews_topstory_ids:
# 访问 url 并拿到 JSON 返回,放到 DataFrame 中
item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
results.append(item)
# 每 20 条输出一条日志,可以在 Web UI 中查看
if len(results) % 20 == 0:
context.log.info(f"Got {len(results)} items so far.")

df = pd.DataFrame(results)

# Dagster supports attaching arbitrary metadata to asset materializations. This metadata will be
# shown in the run logs and also be displayed on the "Activity" tab of the "Asset Details" page in the UI.
# This metadata would be useful for monitoring and maintaining the asset as you iterate.
# Read more about in asset metadata in https://docs.dagster.io/concepts/assets/software-defined-assets#recording-materialization-metadata
context.add_output_metadata(
{
"num_records": len(df),
"preview": MetadataValue.md(df.head().to_markdown()),
}
)
return df


# 最后输出一个 bytes 序列,就是最终的图像,compute_kind 表示计算类型,可以自己指定
@asset(group_name="hackernews", compute_kind="Plot")
def hackernews_topstories_word_cloud(
context: AssetExecutionContext, hackernews_topstories: pd.DataFrame
) -> bytes:
"""Exploratory analysis: Generate a word cloud from the current top 500 HackerNews top stories.
Embed the plot into a Markdown metadata for quick view.

Read more about how to create word clouds in http://amueller.github.io/word_cloud/.
"""
stopwords = set(STOPWORDS)
stopwords.update(["Ask", "Show", "HN"])
titles_text = " ".join([str(item) for item in hackernews_topstories["title"]])
titles_cloud = WordCloud(stopwords=stopwords, background_color="white").generate(titles_text)

# Generate the word cloud image
plt.figure(figsize=(8, 8), facecolor=None)
plt.imshow(titles_cloud, interpolation="bilinear")
plt.axis("off")
plt.tight_layout(pad=0)

# Save the image to a buffer and embed the image into Markdown content for quick view
buffer = BytesIO()
plt.savefig(buffer, format="png")
image_data = base64.b64encode(buffer.getvalue())
md_content = f"![img](data:image/png;base64,{image_data.decode()})"

# Attach the Markdown content as metadata to the asset
# Read about more metadata types in https://docs.dagster.io/_apidocs/ops#metadata-types
context.add_output_metadata({"plot": MetadataValue.md(md_content)})

return image_data

调度代码

当我们完成三个 asset 的构造后,我们就可以去设置具体的调度,代码在 quickstart_etl/__init__.py 中,具体如下:

1
2
3
4
5
6
7
8
9
# 设定要执行的 job 以及 cron 表达式
daily_refresh_schedule = ScheduleDefinition(
job=define_asset_job(name="all_assets_job"), cron_schedule="0 0 * * *"
)

# 载入所有的 assets,并构造 schedules
defs = Definitions(
assets=load_assets_from_package_module(assets), schedules=[daily_refresh_schedule]
)

数据持久化

当我们运行完成之后,一切仿佛那么完美,突然屏幕上出现如下信息:

1
2
3
4
5
6
7
8
9
  File "/Users/wangda/Dev/venvs/xsignal39/lib/python3.9/site-packages/dagster/_daemon/cli/__init__.py", line 82, in _daemon_run_command
controller.check_daemon_loop()
File "/Users/wangda/Dev/venvs/xsignal39/lib/python3.9/site-packages/dagster/_daemon/controller.py", line 296, in check_daemon_loop
self.check_daemon_heartbeats()
File "/Users/wangda/Dev/venvs/xsignal39/lib/python3.9/site-packages/dagster/_daemon/controller.py", line 265, in check_daemon_heartbeats
raise Exception("Stopped dagster-daemon process due to thread heartbeat failure")
Exception: Stopped dagster-daemon process due to thread heartbeat failure
2023-07-02 23:39:27 +0800 - dagster - INFO - Shutting down Dagster services...
2023-07-02 23:39:27 +0800 - dagster - INFO - Dagster services shut down.

再重新启动发现,数据全没了!这是为什么呢?因为没有持久化呀,所有的数据都在一个临时的文件夹中,如下图的 tmpl1mnr02h

当本次的 dagster dev 的进程退出的时候,这个文件夹就彻底消失了。为了解决这个问题,我们需要把数据保存在某个地方,这个时候就要用到 I/O Manager 的能力。

当然,持久化的第一个做法,就是设置 $DAGSTER_HOME 这个环境变量,这样我们数据就会保存在此处,但是这样的做法也有不好的地方,就是不同的项目都只能保存在同一个文件系统下,不够灵活。所以我们接下来就看看如何改动代码,让数据保存在固定文件夹下吧。

写入文件系统

首先我们要先声明 io_manager,修改完后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from dagster import (
Definitions,
ScheduleDefinition,
define_asset_job,
load_assets_from_package_module,
FilesystemIOManager, # dagster 内置的包
)

from . import assets

daily_refresh_schedule = ScheduleDefinition(
job=define_asset_job(name="all_assets_job"), cron_schedule="0 0 * * *"
)

# 在 defs 之间定义
io_manager = FilesystemIOManager(
base_dir = "data" # 相对路径,在 dagster dev 运行的文件夹下
)

# 在 defs 中添加其为 resource
defs = Definitions(
assets=load_assets_from_package_module(assets),
schedules=[daily_refresh_schedule],
resources={
"io_manager": io_manager,
},
)

接下来我们再次运行 dagster dev,就可以看到数据保存在了名为 data 的文件夹中,而不是之前那个乱码一般的临时文件了。

写入数据库

Dagster 同样内置了常用数据库的 IO Manager,可以将 DataFrame 写入数据库,这里我们使用 DuckDB 作为例子。很多人看到这个 DB 可能感觉比较陌生,我问了一下 New Bing,相信能有一些帮助:

问题:DuckDB 是什么,介绍一下功能特点,和 SQLite 或者 MySQL 有什么区别

DuckDB 是一个基于本地文件系统的列式数据库,它的设计思路来源于 SQLite,但是与 SQLite 不同的是,SQLite 是行式数据库,而 DuckDB 是列式数据库。除此以外,两者非常相似:两个都是基于本地文件系统设计的,都有着完整的数据类型支持和 SQL 语法支持。

DuckDB 对标 SQLite,都是嵌入式数据库。SQLite 是嵌入式的 OLTP ,而 DuckDB 是嵌入式 OLAP 。DuckDB 相比 MySQL 和 SQLite 的优势还是比较明显的。 MySQL 和 SQLite 都无法支持完整的 SQL 语法,但是 DuckDB 可以支持 SQL 语法。

DuckDB 的特点包括:

  • 高性能:DuckDB 的查询性能比 SQLite 和 MySQL 都要快。
  • 支持 SQL 语法:DuckDB 支持完整的 SQL 语法。
  • 嵌入式:DuckDB 是嵌入式 OLAP 数据库,可以理解为 AP 版本的 SQLite。

简单来说就是在本地可用的列式数据库,我们按照同样的方法,在 __init__.py 中添加 database_io_manager,我们需要先 pip install dagster_duckdb_pandas 来安装对应包,调整完成之后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from dagster import (
Definitions,
ScheduleDefinition,
define_asset_job,
load_assets_from_package_module,
FilesystemIOManager,
)

# pip install dagster_duckdb_pandas
from dagster_duckdb_pandas import DuckDBPandasIOManager

from . import assets

daily_refresh_schedule = ScheduleDefinition(
job=define_asset_job(name="all_assets_job"), cron_schedule="0 0 * * *"
)

io_manager = FilesystemIOManager(
base_dir = "data" # 相对路径,在 dagster dev 运行的文件夹下
)

# Insert this section anywhere above your `defs = Definitions(...)`
database_io_manager = DuckDBPandasIOManager(database="analytics.hackernews")

defs = Definitions(
assets=load_assets_from_package_module(assets),
schedules=[daily_refresh_schedule],
resources={
"io_manager": io_manager,
"database_io_manager": database_io_manager, # Define the I/O manager here
},
)

接下来就是有意思的地方了,设置好了新的 IO Manager 之后,实际上我们不需要再去对 asset 的代码做任何修改,只需要在声明的时候指定 io manager 即可,代码如下:

1
2
3
4
5
@asset(group_name="hackernews", compute_kind="HackerNews API", io_manager_key="database_io_manager")
def hackernews_topstories(
context: AssetExecutionContext, hackernews_topstory_ids: List[int]
) -> pd.DataFrame:
....

再运行一下我们就会发现,topstories 放到了 DuckDB 中,其他的数据还在本地的 data 文件夹里(但是运行记录等数据,仍然是不在的,这也是为什么我们现在使用的是开发模式,而非生产模式)。

关于数据分区

当我们的数据越来越多的时候,内存中无法一次放得下,我们就需要去尝试将数据分区并按需加载到内存中。

创建无 IO 的 Asset

Asset 一定要使用 IO Manager 吗?这个答案也是否定的。当我们不希望吧数据载入到内存中时,我们可以不返回任何数据,并且可以在依赖资产的函数中定义不加载数据的依赖项。这样还是太抽象,以下是一些常用的场景:

  • 自行管理I/O,使用其他写入存储的库/工具
  • 想运行一个SQL查询,创建或更新数据库中的表
  • 加载的数据无法完整放到内存中
  • 使用I/O Manager 编写管道是可能的,但您不想使用这个Dagster特定的概念。例如,您有一个现有的管道,不想重构它。

接下来,我们就来体验一下这个场景:从互联网下载文件。因为文件是直接下载到计算机上而不是加载到内存并返回,所以不需要使用I/O管理器将数据写回存储器。在 quickstart_etl/assets/hackernews.py 中添加如下代码:

1
2
3
4
5
6
@asset
def stopwords_zip() -> None:
urllib.request.urlretrieve(
"https://docs.dagster.io/assets/stopwords.zip",
"data/stopwords.zip",
)

这里有两个特别的点:

  1. 没有返回语句,因为这个 zip 数据就只是要下载下来,不需要做其他操作
  2. None 标记了返回类型,告诉 Dagster 不需要使用 I/O manager

点击该 asset 然后点击 Materialize Selected,如果在 data 文件夹下看到 stopwords.zip,就说明执行成功了。

不载入内存直接操作 asset

默认情况下,当 asset 生成时,它会将其依赖项加载到内存中。然而,有些情况下你可能想要避免这样做。如果你直接在存储中操作数据,比如解压缩一个压缩文件,你不需要将数据加载到内存中。

这种时候,我们需要做的就是在 asset 的声明中添加 non_argument_deps 参数,来告诉 dagster 不需要去将这里提到的资产载入内容,如下面的代码所示:

1
2
3
4
5
6
7
8
@asset(group_name="hackernews", non_argument_deps={"stopwords_zip"})
def stopwords_csv() -> None:
"""将下载下来的 zip 文件进行解压

用于演示不需要将依赖载入内存,相当于直接操作文件系统(或数据库之类的)
"""
with zipfile.ZipFile("data/stopwords.zip", "r") as zip_ref:
zip_ref.extractall("data")

将用与不用 I/O Manager 的资产进行组合

虽然部分 asset 不用 I/O Manager,总是要和用 I/O Manager 的资产配合,不然就没有办法去最终生成有用的数据,前面我们从 csv 文件拿到了一系列的 stopwords,接下来就可以用网上下载的来替换我们之前写死在代码中的数组了,具体的代码修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 用 non_argument_deps 把相关资产串起来
@asset(group_name="hackernews", compute_kind="Plot", non_argument_deps={"stopwords_csv"})
def hackernews_topstories_word_cloud(
context: AssetExecutionContext, hackernews_topstories: pd.DataFrame
) -> bytes:
"""Exploratory analysis: Generate a word cloud from the current top 500 HackerNews top stories.
Embed the plot into a Markdown metadata for quick view.

Read more about how to create word clouds in http://amueller.github.io/word_cloud/.
"""
#stopwords = set(STOPWORDS)
#stopwords.update(["Ask", "Show", "HN"])
with open("data/stopwords.csv", "r") as f:
stopwords = {row[0] for row in csv.reader(f)}

titles_text = " ".join([str(item) for item in hackernews_topstories["title"]])
titles_cloud = WordCloud(stopwords=stopwords, background_color="white").generate(titles_text)

重新刷新一下就会发现,相关的 asset 已经串联起来了,如下图所示:

管理不同的资源 Resource

Resource 是 Dagster 中的一个对象,主要用于连接外部的服务,比如上面的 HackerNews,以及未来可能用到 Amazon S3 之类的。我们在 topstory_idstopstories 资产中都用了 requests 这个库去访问外部数据,这类代码最终会变得越来越难维护,如果:

  • 多个 asset 都需要访问同样的地址
  • 外部服务访问时需要进行认证鉴权
  • 在使用过程中的配置需要保证不变

当我们整体的数据流水线越来越庞大时,梳理不同 asset 使用的不同的数据连接也会是非常麻烦的事情,所以用 resource 这个概念来组织代码,有如下好处:

  • 可以灵活配置资源
  • 各个资产之间的使用方式是一致的标准的
  • 在 Dagster UI 中可以进行统一的监控

现在我们需要在 quickstart_etl 文件夹中新建一个 resources 文件夹,然后把 data_generator.py 放到其中(因为本文没有严格按照官方教程的流程来,所以要补上)。然后我们来更新一下 quickstart/__init__.py 中的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from .resources import data_generator

# ...

datagen = data_generator.DataGeneratorResource() # Make the resource

defs = Definitions(
assets=[*hackernews_assets],
schedules=[hackernews_schedule],
resources={
"io_manager": io_manager,
"database_io_manager": duckdb_io_manager,
"hackernews_api": datagen, # Add the newly-made resource here
},
)

回到 Dagster UI,在 Overview 的 Resources 里,可以看到已经有我们添加的 hackernews_api 了,但是暂时我们还没有修改代码,所以 Uses 是 0。

所以接下来我们就来使用一下这个 resource。在 assets/hackernews.py 中添加一个新的 assets,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from dagster import AssetExecutionContext, MetadataValue, Output, asset

@asset(group_name="hackernews", compute_kind="HackerNews API")
def signups(hackernews_api: data_generator.DataGeneratorResource):
signups = pd.DataFrame(hackernews_api.get_signups())

return Output(
value=signups,
metadata={
"Record Count": len(signups),
"Preview": MetadataValue.md(signups.head().to_markdown()),
"Earliest Signup": signups["registered_at"].min(),
"Latest Signup": signups["registered_at"].max(),
},
)

重新载入一下,我们就可以发现 Uses 变成 2 了,详情如下图所示:

Resource 本身也是可以接收不同的参数用于配置的,比如 hackernews_api 这个 resource 就有 num_daysseed 两个参数配置,如下图所示:

我们可以直接在声明的时候进行配置,比如生成 365 天的注册用户,这样数量多一点,还是修改 quickstart_etl/__init__.py 的定义:

1
2
3
4
5
6
datagen = data_generator.DataGeneratorResource(num_days=365)

defs = Definitions(
# ...
resources={"hackernews_api": datagen}
)

再接下来问题来了,假设我们已经将代码部署到生产环境的 Dagster 实例中。我们继续在开发过程中进行调试时,发现在本地开发阶段不需要一年的数据。这时候,就可以通过使用环境变量,根据 Dagster 所运行的环境来配置 DataGeneratorResource 的行为。在开发环境中,我们只获取过去 30 天的数据,加快开发速度。而在生产环境中,我们还是获取 365 天的数据。

我们在项目根目录新建一个 .env 文件,内容如下:

1
HACKERNEWS_NUM_DAYS_WINDOW=30

然后修改我们的 __init__.py 文件,引入这个环境变量并刘改代码,代码如下:

1
2
3
datagen = data_generator.DataGeneratorResource(
num_days=EnvVar.int("HACKERNEWS_NUM_DAYS_WINDOW"), # 不要忘记这个逗号
) # Make the resource

再次执行一下我们就会发现,现在只生成过去 30 天的数据了。

至此,本次教程的全部内容就结束了,记得一定要自己动手试一次噢。

参考材料