# -*- coding: utf-8 -*-
import airflow
from intraday_pd import *
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, ShortCircuitOperator
# -------------------------------------------------------------------------------
# dag
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {
'owner': 'TongYu',
'catchup': False,
'start_date': airflow.utils.dates.days_ago(1),
}
dag = DAG(
'real_time_pnl_dag',
catchup=False,
default_args=default_args,
schedule_interval=None,
dagrun_timeout=timedelta(minutes=15),
description='real_time_pnl dag')
basic_position_operator = PythonOperator(
task_id='basic_position_task',
python_callable=basic_position_pd_run,
execution_timeout=timedelta(minutes=5),
dag=dag)
basic_cash_flow_today_operator = PythonOperator(
task_id='basic_cash_flow_today_task',
python_callable=basic_cash_flow_today_pd_run,
execution_timeout=timedelta(minutes=1),
dag=dag)
basic_risks_operator = PythonOperator(
task_id='basic_risks_task',
python_callable=basic_risks_pd_run,
execution_timeout=timedelta(minutes=5),
dag=dag)
basic_underlyer_position_operator = PythonOperator(
task_id='basic_underlyer_position_task',
python_callable=basic_underlyer_position_pd_run,
execution_timeout=timedelta(minutes=5),
dag=dag)
real_time_pnl_operator = PythonOperator(
task_id='real_time_pnl_task',
python_callable=real_time_pnl_pd_run,
execution_timeout=timedelta(minutes=5),
dag=dag)
pnl_notification_operator = PythonOperator(
task_id='real_time_pnl_notification_task',
python_callable=intraday_pnl_report_notifier,
execution_timeout=timedelta(minutes=5),
dag=dag)
# -------------------------------------------------------------------------------
# Operator Dependency Relationship
basic_risks_operator.set_upstream(basic_position_operator)
real_time_pnl_operator.set_upstream(basic_risks_operator)
real_time_pnl_operator.set_upstream(basic_cash_flow_today_operator)
real_time_pnl_operator.set_upstream(basic_underlyer_position_operator)
real_time_pnl_operator.set_downstream(pnl_notification_operator)