DAG: real_time_risk_dag real_time_risk dag

schedule: None


real_time_risk_dag

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# -*- coding: utf-8 -*-

import airflow
from intraday_pd import *
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, ShortCircuitOperator

# -------------------------------------------------------------------------------
# dag
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'TongYu',
    'catchup': False,
    'start_date': airflow.utils.dates.days_ago(1),
}
dag = DAG(
    'real_time_risk_dag',
    catchup=False,
    default_args=default_args,
    schedule_interval=None,
    dagrun_timeout=timedelta(minutes=15),
    description='real_time_risk dag')

basic_position_operator = PythonOperator(
    task_id='basic_position_task',
    python_callable=basic_position_pd_run,
    execution_timeout=timedelta(minutes=5),
    dag=dag)

basic_cash_flow_operator = PythonOperator(
    task_id='basic_cash_flow_task',
    python_callable=basic_cash_flow_pd_run,
    execution_timeout=timedelta(minutes=5),
    dag=dag)

basic_risks_operator = PythonOperator(
    task_id='basic_risks_task',
    python_callable=basic_risks_pd_run,
    execution_timeout=timedelta(minutes=5),
    dag=dag)

basic_underlyer_position_operator = PythonOperator(
    task_id='basic_underlyer_position_task',
    python_callable=basic_underlyer_position_pd_run,
    execution_timeout=timedelta(minutes=5),
    dag=dag)

basic_reference_data_operator = PythonOperator(
    task_id='basic_reference_data_task',
    python_callable=basic_reference_data_pd_run,
    execution_timeout=timedelta(minutes=5),
    dag=dag)

real_time_position_operator = PythonOperator(
    task_id='real_time_position_task',
    python_callable=real_time_position_pd_run,
    execution_timeout=timedelta(minutes=5),
    dag=dag)

real_time_risk_operator = PythonOperator(
    task_id='real_time_risk_task',
    python_callable=real_time_risk_pd_run,
    execution_timeout=timedelta(minutes=5),
    dag=dag)

risk_notification_operator = PythonOperator(
    task_id='real_time_risk_notification_task',
    python_callable=intraday_risk_report_notifier,
    execution_timeout=timedelta(minutes=5),
    dag=dag)

# -------------------------------------------------------------------------------
# Operator Dependency Relationship
basic_risks_operator.set_upstream(basic_position_operator)

real_time_position_operator.set_upstream(basic_risks_operator)
real_time_position_operator.set_upstream(basic_cash_flow_operator)
real_time_position_operator.set_upstream(basic_reference_data_operator)

real_time_risk_operator.set_upstream(basic_underlyer_position_operator)
real_time_risk_operator.set_upstream(real_time_position_operator)

real_time_risk_operator.set_downstream(risk_notification_operator)