DAG: real_time_pd_dag real_time_task dag

schedule: 0:05:00


real_time_pd_dag

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# -*- coding: utf-8 -*-

import airflow
from intraday_pd import *
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, ShortCircuitOperator

# -------------------------------------------------------------------------------
# dag
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'TongYu',
    'catchup': False,
    'start_date': airflow.utils.dates.days_ago(1),
}
dag = DAG(
    'real_time_pd_dag',
    catchup=False,
    default_args=default_args,
    schedule_interval=timedelta(minutes=5),
    dagrun_timeout=timedelta(minutes=30),
    description='real_time_task dag')

basic_position_operator = PythonOperator(
    task_id='basic_position_task',
    python_callable=basic_position_pd_run,
    execution_timeout=timedelta(minutes=15),
    dag=dag)

basic_cash_flow_operator = PythonOperator(
    task_id='basic_cash_flow_task',
    python_callable=basic_cash_flow_pd_run,
    execution_timeout=timedelta(minutes=15),
    dag=dag)

basic_cash_flow_today_operator = PythonOperator(
    task_id='basic_cash_flow_today_task',
    python_callable=basic_cash_flow_today_pd_run,
    execution_timeout=timedelta(minutes=15),
    dag=dag)

basic_risks_operator = PythonOperator(
    task_id='basic_risks_task',
    python_callable=basic_risks_pd_run,
    execution_timeout=timedelta(minutes=15),
    dag=dag)

basic_underlyer_position_operator = PythonOperator(
    task_id='basic_underlyer_position_task',
    python_callable=basic_underlyer_position_pd_run,
    execution_timeout=timedelta(minutes=15),
    dag=dag)

basic_reference_data_operator = PythonOperator(
    task_id='basic_reference_data_task',
    python_callable=basic_reference_data_pd_run,
    execution_timeout=timedelta(minutes=15),
    dag=dag)

real_time_position_operator = PythonOperator(
    task_id='real_time_position_task',
    python_callable=real_time_position_pd_run,
    execution_timeout=timedelta(minutes=15),
    dag=dag)

real_time_expiring_position_operator = PythonOperator(
    task_id='real_time_expiring_position_task',
    python_callable=real_time_expiring_position_pd_run,
    execution_timeout=timedelta(minutes=15),
    dag=dag)

real_time_risk_operator = PythonOperator(
    task_id='real_time_risk_task',
    python_callable=real_time_risk_pd_run,
    execution_timeout=timedelta(minutes=15),
    dag=dag)

real_time_pnl_operator = PythonOperator(
    task_id='real_time_pnl_task',
    python_callable=real_time_pnl_pd_run,
    execution_timeout=timedelta(minutes=15),
    dag=dag)

real_time_valuation_operator = PythonOperator(
    task_id='real_time_valuation_task',
    python_callable=real_time_valuation_pd_run,
    execution_timeout=timedelta(minutes=15),
    dag=dag)

trade_notification_operator = PythonOperator(
    task_id='trade_notification_task',
    python_callable=trade_notification_run,
    execution_timeout=timedelta(minutes=15),
    dag=dag)

basic_portfolio_trades_operator = PythonOperator(
    task_id='basic_portfolio_risk_task',
    python_callable=basic_portfolio_trades_pd_run,
    execution_timeout=timedelta(minutes=15),
    dag=dag)

real_time_portfolio_trades_operator = PythonOperator(
    task_id='real_time_portfolio_trades_task',
    python_callable=real_time_portfolio_trades_pd_run,
    execution_timeout=timedelta(minutes=15),
    dag=dag)

real_time_mega_report_operator = PythonOperator(
    task_id='real_time_mega_report_task',
    python_callable=real_time_mega_report_run,
    execution_timeout=timedelta(minutes=15),
    dag=dag)

# -------------------------------------------------------------------------------
# Operator Dependency Relationship
basic_risks_operator.set_upstream(basic_position_operator)

real_time_position_operator.set_upstream(basic_risks_operator)
real_time_position_operator.set_upstream(basic_cash_flow_operator)
real_time_position_operator.set_upstream(basic_reference_data_operator)

real_time_expiring_position_operator.set_upstream(basic_risks_operator)
real_time_expiring_position_operator.set_upstream(basic_cash_flow_operator)

real_time_valuation_operator.set_upstream(basic_risks_operator)
real_time_valuation_operator.set_upstream(basic_cash_flow_operator)

real_time_risk_operator.set_upstream(real_time_position_operator)
real_time_risk_operator.set_upstream(basic_underlyer_position_operator)

real_time_pnl_operator.set_upstream(basic_risks_operator)
real_time_pnl_operator.set_upstream(basic_cash_flow_today_operator)
real_time_pnl_operator.set_upstream(basic_underlyer_position_operator)

real_time_portfolio_trades_operator.set_upstream(basic_risks_operator)
real_time_portfolio_trades_operator.set_upstream(basic_portfolio_trades_operator)

real_time_mega_report_operator.set_upstream(real_time_pnl_operator)
real_time_mega_report_operator.set_upstream(basic_cash_flow_operator)