DAG: real_time_market_data_update_dag real_time_market_data_update dag

schedule: None


real_time_market_data_update_dag

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# -*- coding: utf-8 -*-

import os
import airflow
from intraday_pd import *
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator, ShortCircuitOperator

# -------------------------------------------------------------------------------
# dag
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization

# set env variable "MKT_DATA = on" to turn on this dag
default_args = {
    'owner': 'TongYu',
    'catchup': False,
    'start_date': airflow.utils.dates.days_ago(1),
}
dag = DAG(
    'real_time_market_data_update_dag',
    catchup=False,
    default_args=default_args,
    schedule_interval='* * * * 1-5' if os.environ.get('MKT_DATA', None) == 'on' else None,
    dagrun_timeout=timedelta(minutes=15),
    description='real_time_market_data_update dag')


def scheduler():
    execution_time = datetime.now()
    time_nine = utils.trans_datetime('9:30:00')
    time_eleven = utils.trans_datetime('11:30:00')
    time_thirteen = utils.trans_datetime('13:00:30')
    time_fifteen = utils.trans_datetime('15:00:30')
    if execution_time < time_nine:
        return False
    if execution_time > time_fifteen:
        return False
    if time_eleven < execution_time < time_thirteen:
        return False
    return True


scheduler_operator = ShortCircuitOperator(
    task_id='scheduler_task',
    provide_context=False,
    python_callable=scheduler,
    dag=dag)

market_data_update_operator = PythonOperator(
    task_id='market_data_update_task',
    python_callable=real_time_market_data,
    execution_timeout=timedelta(minutes=5),
    dag=dag)

scheduler_operator.set_downstream(market_data_update_operator)