Workflow DSL 研究
Workflow DSL 设计要点#
DSL 中如何表达任务之间的依赖关系#
如何支持代码重用#
输入参数与环境变量#
任务之间的信息传递#
任务之间的文件传递#
如果是分布式运行架构,任务的运行节点如何指定#
任务运行所需要的工具和环境设置#
Gitlab-CI#
https://docs.gitlab.com/ee/ci/yaml/
include:
- local: '/templates/.gitlab-ci-template.yml'
.tests:
script: rake test
stage: test
only:
refs:
- branches
rspec:
extends: .tests
script: rake rspec
only:
variables:
- $RSPEC
job1:
before_script:
- echo "Execute this command before any 'script:' commands."
script:
- echo "An example script section."
after_script:
- echo "Execute this command after the `script` section completes."
artifacts:
paths:
- binaries/
exclude:
- binaries/**/*.o
job2:
retry: 2
timeout: 3h 30m
needs:
- job1
script:
- echo "hello job2"
tags:
- agent1
- windows_agent2
rules_job:
script: echo "Hello, Rules!"
rules:
- if: '$CI_MERGE_REQUEST_SOURCE_BRANCH_NAME =~ /^feature/ && $CI_MERGE_REQUEST_TARGET_BRANCH_NAME != $CI_DEFAULT_BRANCH'
when: never
- if: '$CI_MERGE_REQUEST_SOURCE_BRANCH_NAME =~ /^feature/'
when: manual
allow_failure: true
- if: '$CI_MERGE_REQUEST_SOURCE_BRANCH_NAME'
trigger_job:
stage: deploy
trigger: my/deployment
gitlab-ci 的语法中可以通过 needs 指定 job 之间的依赖关系,before_script,script,after_script 分别是 job 执行的3个步骤,每个里面可以写若干执行语句。
通过 artifacts 来指定制品文件,tags 指定 job 运行的节点,retry 和 timeout 用来控制 job 的重试和超时时间。
通过 rules 指定 job 运行的条件,trigger 可以出发另外的 pipeline。
可以通过 include 引入其它文件,通过 extends 达成像面向对象语言中子类派生的功能,比较强大。
YAML Anchors and Alias#
特别值得一提的是,通过 YAML Anchors and Alias 的语法,可以在 yaml 中共享一段内容,不用重复输入。
Azure DevOps#
https://docs.microsoft.com/en-us/azure/devops/pipelines/yaml-schema
Azure DevOps 也是使用 YAML 来定义,大体设计与 gitlab-ci 类似。
其有一个独特的 template 形式,与 gitlab-ci 的 extends 模式不同:
# File: jobs/build.yml
parameters:
name: ''
pool: ''
sign: false
jobs:
- job: ${{ parameters.name }}
pool: ${{ parameters.pool }}
steps:
- script: npm install
- script: npm test
- ${{ if eq(parameters.sign, 'true') }}:
- script: sign
# File: azure-pipelines.yml
jobs:
- template: jobs/build.yml # Template reference
parameters:
name: macOS
pool:
vmImage: 'macOS-latest'
- template: jobs/build.yml # Template reference
parameters:
name: Linux
pool:
vmImage: 'ubuntu-latest'
- template: jobs/build.yml # Template reference
parameters:
name: Windows
pool:
vmImage: 'windows-latest'
sign: true # Extra step on Windows only
在模版中,参数(parameters)是通过 ${{ }} 来进行替换的,而变量(variables)是通过 $( ) 进行使用的。
明确的区分模版参数与普通变量对于调试会有一些帮助,用户可以明确知道在什么环节发生了替换,但增加了一定的学习成本吧。
Airflow#
https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html
from datetime import datetime, timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
with DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
)
t1 >> [t2, t3]
Airflow 的配置就是 Python 代码,其 Operator 对象就是一个 task,通过参数可以指定重试超时等设置。
Task 之间的依赖关系是通过 >> 和 << 操作符指定的,也可以通过 Operator 对象的 set_downstream 或 set_upstream 方法来设置。
通过 Operator 对象的 queue 参数可以指定 Task 的运行节点,pre_execute 和 post_execute 两个回调函数可以处理任务开始和结束时的事件。
任务之间共享数据貌似没有有效的办法,网上说可能需要自己用 callback 函数来实现。
使用编程语言作为 DSL 的好处#
使用编程语言作为 DSL 可以借用语言的特性支持动态生成 DSL 的功能。