1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | # -*- coding: utf-8 -*-
import os
import airflow
from intraday_pd import *
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator, ShortCircuitOperator
# -------------------------------------------------------------------------------
# dag
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
# set env variable "MKT_DATA = on" to turn on this dag
default_args = {
'owner': 'TongYu',
'catchup': False,
'start_date': airflow.utils.dates.days_ago(1),
}
dag = DAG(
'real_time_market_data_update_dag',
catchup=False,
default_args=default_args,
schedule_interval='* * * * 1-5' if os.environ.get('MKT_DATA', None) == 'on' else None,
dagrun_timeout=timedelta(minutes=15),
description='real_time_market_data_update dag')
def scheduler():
execution_time = datetime.now()
time_nine = utils.trans_datetime('9:30:00')
time_eleven = utils.trans_datetime('11:30:00')
time_thirteen = utils.trans_datetime('13:00:30')
time_fifteen = utils.trans_datetime('15:00:30')
if execution_time < time_nine:
return False
if execution_time > time_fifteen:
return False
if time_eleven < execution_time < time_thirteen:
return False
return True
scheduler_operator = ShortCircuitOperator(
task_id='scheduler_task',
provide_context=False,
python_callable=scheduler,
dag=dag)
market_data_update_operator = PythonOperator(
task_id='market_data_update_task',
python_callable=real_time_market_data,
execution_timeout=timedelta(minutes=5),
dag=dag)
scheduler_operator.set_downstream(market_data_update_operator)
|