Apache AirflowはPython言語のタスクスケジューラです。
〇Apache Airflowの画面
〇構築方法
1.以下のVagrantfileを使用して、 Apache AirflowとPostgreSQLをインストールした仮想マシン(Ubuntu18.04)を構築します。
Vagrantfile
VAGRANTFILE_API_VERSION = "2"
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
config.vm.box = "bento/ubuntu-18.04"
config.vm.hostname = "ub1804airflowpg"
config.vm.provider :virtualbox do |vbox|
vbox.name = "ub1804airflowpg"
vbox.cpus = 4
vbox.memory = 4096
vbox.customize ["modifyvm", :id, "--nicpromisc2","allow-all"]
end
config.vm.network "private_network", ip: "192.168.55.115", :netmask => "255.255.255.0"
config.vm.network "public_network", ip:"192.168.1.115", :netmask => "255.255.255.0"
config.vm.provision "shell", inline: <<-SHELL
sed -i -e 's/# ja_JP.UTF-8 UTF-8/ja_JP.UTF-8 UTF-8/' /etc/locale.gen
locale-gen
localectl set-locale LANG=ja_JP.UTF-8
localectl set-keymap jp106
timedatectl set-timezone Asia/Tokyo
DEBIAN_FRONTEND=noninteractive apt-get -y -o Dpkg::Options::="--force-confdef" -o Dpkg::Options::="--force-confold" upgrade
# install postgresql
echo "deb http://apt.postgresql.org/pub/repos/apt/ $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list
apt-get -y install wget ca-certificates
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add -
apt-get update
apt-get upgrade
apt-get -y install postgresql-10
echo "listen_addresses='*'" >> /etc/postgresql/10/main/postgresql.conf
#sed -i 's/host.*all.*all.*127.0.0.1/#host all all 127.0.0.1/g' /etc/postgresql/10/main/pg_hba.conf
echo "host all all 127.0.0.1/32 password" >> /etc/postgresql/10/main/pg_hba.conf
echo "host all all 192.168.1.0/24 password" >> /etc/postgresql/10/main/pg_hba.conf
echo "host all all 192.168.55.0/24 password" >> /etc/postgresql/10/main/pg_hba.conf
su - postgres << EOF
createdb -T template0 --locale=ja_JP.UTF-8 --encoding=UTF8 airflow
psql -c "
alter user postgres with password 'postgres';
create user airflow with password 'airflow';
grant all privileges on database airflow to airflow;
"
EOF
echo "postgres:postgres" | chpasswd
systemctl restart postgresql.service
apt-get -y install libmysqlclient-dev python3-dev build-essential
apt-get -y install libpq-dev python-dev
apt-get -y install redis-server
# install pipenv
apt-get -y install python-pip
#pip install --upgrade pip
pip install pip==9.0.1
pip install --upgrade setuptools
pip install pipenv
# install airflow.
groupadd airflow
useradd -g airflow -s /bin/bash -d /home/airflow -m airflow
mkdir -p /opt/airflow/dags
chown -R airflow:airflow /opt/airflow
sudo -u airflow /bin/bash << AF_EOF
export AIRFLOW_HOME=/opt/airflow
export HOME=/home/airflow
cd /opt/airflow
echo $HOME
pipenv --python 3.5
pipenv install
pipenv run python -V
pipenv install redis
pipenv install psycopg2
pipenv install apache-airflow[devel,mysql,celery,cryptography]==1.9.0
wget https://raw.githubusercontent.com/apache/incubator-airflow/master/airflow/config_templates/default_airflow.cfg
cp default_airflow.cfg airflow.cfg
sed -i -e 's#sql_alchemy_conn = sqlite:///\{AIRFLOW_HOME\}/airflow.db#sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost:5432/airflow#' airflow.cfg
sed -i -e 's/executor = SequentialExecutor/executor = CeleryExecutor/' airflow.cfg
sed -i -e 's#\{AIRFLOW_HOME\}#/opt/airflow#' airflow.cfg
sed -i -e 's#broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow#broker_url = redis://localhost:6379#' airflow.cfg
sed -i -e 's#result_backend = db+mysql://airflow:airflow@localhost:3306/airflow#celery_result_backend = db+postgresql://airflow:airflow@localhost/airflow#' airflow.cfg
sed -i -e 's#load_examples = True#load_examples = False#' airflow.cfg
sed -i -e 's#default_timezone = utc#default_timezone = Asia/Tokyo#' airflow.cfg
cat << EOF > gen.py
from cryptography.fernet import Fernet
fernet_key= Fernet.generate_key()
print(fernet_key.decode('utf-8'), end='"')
EOF
pipenv run python gen.py > /tmp/fernet_key
echo -n 'export FERNET_KEY="' > /tmp/setfernetkey
cat /tmp/fernet_key >> /tmp/setfernetkey
AF_EOF
source /tmp/setfernetkey
sed -i -e "s/{FERNET_KEY}/$FERNET_KEY/" /opt/airflow/airflow.cfg
sudo -u airflow /bin/bash << AF_EOF
export AIRFLOW_HOME=/opt/airflow
export HOME=/home/airflow
cd /opt/airflow
pipenv run airflow initdb
# prepare a sample dag
cat << EOF > /opt/airflow/dags/dag_example1.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
operator_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 7, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
dag_id='dag_example1',
default_args=operator_args,
catchup=False,
schedule_interval='0,5,10,15,20,25,30,35,40,45,50,55 * * * *'
)
task1 = BashOperator(
task_id='task1',
bash_command='echo "task1:"`date` >> /tmp/test.log',
dag=dag
)
task2 = BashOperator(
task_id='task2',
bash_command='sleep 5 && echo "task2:"`date` >> /tmp/test.log',
dag=dag
)
task1 >> task2
EOF
AF_EOF
mkdir -p /run/airflow
chown airflow:airflow /run/airflow
# setup worker service
cat << EOF > /etc/systemd/system/airflow-worker.service
[Unit]
Description=Airflow worker daemon
Requires=network.target
[Service]
User=airflow
Group=airflow
Type=simple
WorkingDirectory=/opt/airflow
Environment=AIRFLOW_HOME=/opt/airflow
ExecStart=/usr/local/bin/pipenv run airflow worker --pid /run/airflow/worker.pid
Restart=on-failure
RestartSec=30s
[Install]
WantedBy=multi-user.target
EOF
systemctl enable airflow-worker.service
systemctl start airflow-worker.service
# setup scheduler service
cat << EOF > /etc/systemd/system/airflow-scheduler.service
[Unit]
Description=Airflow scheduler daemon
Requires=network.target
[Service]
User=airflow
Group=airflow
Type=simple
WorkingDirectory=/opt/airflow
Environment=AIRFLOW_HOME=/opt/airflow
ExecStart=/usr/local/bin/pipenv run airflow scheduler --pid /run/airflow/scheduler.pid
Restart=on-failure
RestartSec=30s
[Install]
WantedBy=multi-user.target
EOF
systemctl enable airflow-scheduler.service
systemctl start airflow-scheduler.service
cat << EOF > /etc/systemd/system/airflow-webserver.service
[Unit]
Description=Airflow webserver daemon
Requires=network.target
[Service]
User=airflow
Group=airflow
Type=simple
WorkingDirectory=/opt/airflow
Environment=AIRFLOW_HOME=/opt/airflow
ExecStart=/usr/local/bin/pipenv run airflow webserver --pid /run/airflow/webserver.pid
Restart=on-failure
RestartSec=30s
[Install]
WantedBy=multi-user.target
EOF
systemctl enable airflow-webserver.service
systemctl start airflow-webserver.service
echo 'url -> http://192.168.55.115:8080/'
SHELL
end
2. ブラウザでhttp://192.168.55.115:8080/にアクセスして、サンプルDAGのdag_example1をPause状態のOffからOnに変更します。5分程度待つとジョブが実行されます。
○関連情報
・Apache Airflowに関する他の記事は
こちらを参照してください。
・psycopg2に関する他の記事は
こちらを参照してください。