Workflow DSL 研究

Workflow DSL 设计要点#

DSL 中如何表达任务之间的依赖关系#

如何支持代码重用#

输入参数与环境变量#

任务之间的信息传递#

任务之间的文件传递#

如果是分布式运行架构,任务的运行节点如何指定#

任务运行所需要的工具和环境设置#

Gitlab-CI#

https://docs.gitlab.com/ee/ci/yaml/

include:
  - local: '/templates/.gitlab-ci-template.yml'

.tests:
  script: rake test
  stage: test
  only:
	refs:
	  - branches

rspec:
  extends: .tests
  script: rake rspec
  only:
	variables:
	  - $RSPEC

job1:
  before_script:
	- echo "Execute this command before any 'script:' commands."
  script:
	- echo "An example script section."
  after_script:
	- echo "Execute this command after the `script` section completes."
  artifacts:
	paths:
	  - binaries/
	exclude:
	  - binaries/**/*.o

job2:
  retry: 2
  timeout: 3h 30m
  needs:
	- job1
  script:
	- echo "hello job2"
  tags:
	- agent1
	- windows_agent2

rules_job:
  script: echo "Hello, Rules!"
  rules:
	- if: '$CI_MERGE_REQUEST_SOURCE_BRANCH_NAME =~ /^feature/ && $CI_MERGE_REQUEST_TARGET_BRANCH_NAME != $CI_DEFAULT_BRANCH'
	  when: never
	- if: '$CI_MERGE_REQUEST_SOURCE_BRANCH_NAME =~ /^feature/'
	  when: manual
	  allow_failure: true
	- if: '$CI_MERGE_REQUEST_SOURCE_BRANCH_NAME'

trigger_job:
  stage: deploy
  trigger: my/deployment

gitlab-ci 的语法中可以通过 needs 指定 job 之间的依赖关系,before_script,script,after_script 分别是 job 执行的3个步骤,每个里面可以写若干执行语句。

通过 artifacts 来指定制品文件,tags 指定 job 运行的节点,retry 和 timeout 用来控制 job 的重试和超时时间。

通过 rules 指定 job 运行的条件,trigger 可以出发另外的 pipeline。

可以通过 include 引入其它文件,通过 extends 达成像面向对象语言中子类派生的功能,比较强大。

YAML Anchors and Alias#

特别值得一提的是,通过 YAML Anchors and Alias 的语法,可以在 yaml 中共享一段内容,不用重复输入。

Azure DevOps#

https://docs.microsoft.com/en-us/azure/devops/pipelines/yaml-schema

Azure DevOps 也是使用 YAML 来定义,大体设计与 gitlab-ci 类似。

其有一个独特的 template 形式,与 gitlab-ci 的 extends 模式不同:

# File: jobs/build.yml

parameters:
  name: ''
  pool: ''
  sign: false

jobs:
- job: ${{ parameters.name }}
  pool: ${{ parameters.pool }}
  steps:
  - script: npm install
  - script: npm test
  - ${{ if eq(parameters.sign, 'true') }}:
	- script: sign
# File: azure-pipelines.yml

jobs:
- template: jobs/build.yml  # Template reference
  parameters:
	name: macOS
	pool:
	  vmImage: 'macOS-latest'

- template: jobs/build.yml  # Template reference
  parameters:
	name: Linux
	pool:
	  vmImage: 'ubuntu-latest'

- template: jobs/build.yml  # Template reference
  parameters:
	name: Windows
	pool:
	  vmImage: 'windows-latest'
	sign: true  # Extra step on Windows only

在模版中,参数(parameters)是通过 ${{ }} 来进行替换的,而变量(variables)是通过 $( ) 进行使用的。

明确的区分模版参数与普通变量对于调试会有一些帮助,用户可以明确知道在什么环节发生了替换,但增加了一定的学习成本吧。

Airflow#

https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html

from datetime import datetime, timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
	'owner': 'airflow',
	'depends_on_past': False,
	'email': ['airflow@example.com'],
	'email_on_failure': False,
	'email_on_retry': False,
	'retries': 1,
	'retry_delay': timedelta(minutes=5),
	# 'queue': 'bash_queue',
	# 'pool': 'backfill',
	# 'priority_weight': 10,
	# 'end_date': datetime(2016, 1, 1),
	# 'wait_for_downstream': False,
	# 'dag': dag,
	# 'sla': timedelta(hours=2),
	# 'execution_timeout': timedelta(seconds=300),
	# 'on_failure_callback': some_function,
	# 'on_success_callback': some_other_function,
	# 'on_retry_callback': another_function,
	# 'sla_miss_callback': yet_another_function,
	# 'trigger_rule': 'all_success'
}
with DAG(
	'tutorial',
	default_args=default_args,
	description='A simple tutorial DAG',
	schedule_interval=timedelta(days=1),
	start_date=datetime(2021, 1, 1),
	catchup=False,
	tags=['example'],
) as dag:

	# t1, t2 and t3 are examples of tasks created by instantiating operators
	t1 = BashOperator(
		task_id='print_date',
		bash_command='date',
	)

	t2 = BashOperator(
		task_id='sleep',
		depends_on_past=False,
		bash_command='sleep 5',
		retries=3,
	)
	t1.doc_md = dedent(
		"""\
	#### Task Documentation
	You can document your task using the attributes `doc_md` (markdown),
	`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
	rendered in the UI's Task Instance Details page.
	![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)

	"""
	)

	dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG
	dag.doc_md = """
	This is a documentation placed anywhere
	"""  # otherwise, type it like this
	templated_command = dedent(
		"""
	{% for i in range(5) %}
		echo "{{ ds }}"
		echo "{{ macros.ds_add(ds, 7)}}"
		echo "{{ params.my_param }}"
	{% endfor %}
	"""
	)

	t3 = BashOperator(
		task_id='templated',
		depends_on_past=False,
		bash_command=templated_command,
		params={'my_param': 'Parameter I passed in'},
	)

	t1 >> [t2, t3]

Airflow 的配置就是 Python 代码,其 Operator 对象就是一个 task,通过参数可以指定重试超时等设置。

Task 之间的依赖关系是通过 >> 和 << 操作符指定的,也可以通过 Operator 对象的 set_downstream 或 set_upstream 方法来设置。

通过 Operator 对象的 queue 参数可以指定 Task 的运行节点,pre_execute 和 post_execute 两个回调函数可以处理任务开始和结束时的事件。

任务之间共享数据貌似没有有效的办法,网上说可能需要自己用 callback 函数来实现。

使用编程语言作为 DSL 的好处#

使用编程语言作为 DSL 可以借用语言的特性支持动态生成 DSL 的功能。

comments powered by Disqus