〇Apache Airflowの画面
〇構築方法
1.以下のVagrantfileを使用して、 Apache Airflowをインストールした仮想マシン(CentOS7.5)を構築します。
Vagrantfile
VAGRANTFILE_API_VERSION = "2"
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
config.vm.box = "bento/centos-7.5"
config.vm.hostname = "co75airflow"
config.vm.provider :virtualbox do |vbox|
vbox.name = "co75airflow"
vbox.cpus = 4
vbox.memory = 4096
vbox.customize ["modifyvm", :id, "--nicpromisc2","allow-all"]
end
config.vm.network "private_network", ip: "192.168.55.115", :netmask => "255.255.255.0"
config.vm.network "public_network", ip:"192.168.1.115", :netmask => "255.255.255.0"
config.vm.provision "shell", inline: <<-SHELL
localectl set-locale LANG=ja_JP.UTF-8
timedatectl set-timezone Asia/Tokyo
# install mysql
sudo yum -y remove mariadb-libs
yum -y localinstall http://dev.mysql.com/get/mysql57-community-release-el7-7.noarch.rpm
yum -y install yum-utils
yum -y install mysql mysql-server
sudo systemctl enable mysqld.service
sudo systemctl start mysqld.service
# change password and create users and databases.
chkconfig mysqld on
service mysqld start
export MYSQL_ROOTPWD='Root123#'
export MYSQL_PWD=`cat /var/log/mysqld.log | awk '/temporary password/ {print $NF}'`
mysql -uroot --connect-expired-password -e "SET PASSWORD = PASSWORD('Root123#'); FLUSH PRIVILEGES;"
mysql -uroot -pRoot123# -e "UNINSTALL PLUGIN validate_password;"
mysql -uroot -pRoot123# -e "SET PASSWORD = PASSWORD('root'); FLUSH PRIVILEGES;"
mysql -uroot -proot -e "CREATE DATABASE airflow DEFAULT CHARACTER SET utf8;"
mysql -uroot -proot -e "CREATE USER airflow@localhost IDENTIFIED BY 'airflow';"
mysql -uroot -proot -e "GRANT ALL PRIVILEGES ON airflow.* TO 'airflow'@'localhost';"
mysql -uroot -proot -e "FLUSH PRIVILEGES;"
yum -y install epel-release
yum update
yum -y install python36 python-devel python36-devel gcc-c++ openldap-devel openssl-devel mysql-devel
yum -y install redis
systemctl enable redis
systemctl start redis
# install pipenv
yum -y install python-pip
#pip install --upgrade pip
pip install pip==9.0.1
pip install --upgrade setuptools
pip install pipenv
# install airflow.
groupadd airflow
useradd -g airflow -s /bin/bash -d /home/airflow -m airflow
mkdir -p /opt/airflow/dags
chown -R airflow:airflow /opt/airflow
sudo -u airflow /bin/bash << AF_EOF
export AIRFLOW_HOME=/opt/airflow
export HOME=/home/airflow
cd /opt/airflow
echo $HOME
pipenv --python 3.6
pipenv install
pipenv run python -V
pipenv install redis
pipenv install apache-airflow[devel,mysql,celery,cryptography]==1.9.0
wget https://raw.githubusercontent.com/apache/incubator-airflow/master/airflow/config_templates/default_airflow.cfg
cp default_airflow.cfg airflow.cfg
sed -i -e 's#sql_alchemy_conn = sqlite:///\{AIRFLOW_HOME\}/airflow.db#sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow#' airflow.cfg
sed -i -e 's/executor = SequentialExecutor/executor = CeleryExecutor/' airflow.cfg
sed -i -e 's#\{AIRFLOW_HOME\}#/opt/airflow#' airflow.cfg
sed -i -e 's#broker_url = sqla\+mysql://airflow:airflow@localhost:3306/airflow#broker_url = redis://localhost:6379#' airflow.cfg
sed -i -e 's#load_examples = True#load_examples = False#' airflow.cfg
sed -i -e 's#default_timezone = utc#default_timezone = Asia/Tokyo#' airflow.cfg
cat << EOF > gen.py
from cryptography.fernet import Fernet
fernet_key= Fernet.generate_key()
print(fernet_key.decode('utf-8'), end='"')
EOF
pipenv run python gen.py > /tmp/fernet_key
echo -n 'export FERNET_KEY="' > /tmp/setfernetkey
cat /tmp/fernet_key >> /tmp/setfernetkey
AF_EOF
source /tmp/setfernetkey
sed -i -e "s/{FERNET_KEY}/$FERNET_KEY/" /opt/airflow/airflow.cfg
sudo -u airflow /bin/bash << AF_EOF
export AIRFLOW_HOME=/opt/airflow
export HOME=/home/airflow
cd /opt/airflow
pipenv run airflow initdb
# prepare a sample dag
cat << EOF > /opt/airflow/dags/dag_example1.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
operator_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 7, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
dag_id='dag_example1',
default_args=operator_args,
catchup=False,
schedule_interval='0,5,10,15,20,25,30,35,40,45,50,55 * * * *'
)
task1 = BashOperator(
task_id='task1',
bash_command='echo "task1:"`date` >> /tmp/test.log',
dag=dag
)
task2 = BashOperator(
task_id='task2',
bash_command='sleep 5 && echo "task2:"`date` >> /tmp/test.log',
dag=dag
)
task1 >> task2
EOF
AF_EOF
mkdir -p /run/airflow
chown airflow:airflow /run/airflow
# setup worker service
cat << EOF > /etc/systemd/system/airflow-worker.service
[Unit]
Description=Airflow worker daemon
Requires=network.target
[Service]
User=airflow
Group=airflow
Type=simple
WorkingDirectory=/opt/airflow
Environment=AIRFLOW_HOME=/opt/airflow
ExecStart=/bin/pipenv run airflow worker --pid /run/airflow/worker.pid
Restart=on-failure
RestartSec=30s
[Install]
WantedBy=multi-user.target
EOF
systemctl enable airflow-worker.service
systemctl start airflow-worker.service
# setup scheduler service
cat << EOF > /etc/systemd/system/airflow-scheduler.service
[Unit]
Description=Airflow scheduler daemon
Requires=network.target
[Service]
User=airflow
Group=airflow
Type=simple
WorkingDirectory=/opt/airflow
Environment=AIRFLOW_HOME=/opt/airflow
ExecStart=/bin/pipenv run airflow scheduler --pid /run/airflow/scheduler.pid
Restart=on-failure
RestartSec=30s
[Install]
WantedBy=multi-user.target
EOF
systemctl enable airflow-scheduler.service
systemctl start airflow-scheduler.service
cat << EOF > /etc/systemd/system/airflow-webserver.service
[Unit]
Description=Airflow webserver daemon
Requires=network.target
[Service]
User=airflow
Group=airflow
Type=simple
WorkingDirectory=/opt/airflow
Environment=AIRFLOW_HOME=/opt/airflow
ExecStart=/bin/pipenv run airflow webserver --pid /run/airflow/webserver.pid
Restart=on-failure
RestartSec=30s
[Install]
WantedBy=multi-user.target
EOF
systemctl enable airflow-webserver.service
systemctl start airflow-webserver.service
echo 'url -> http://192.168.55.115:8080/'
SHELL
end
2. ブラウザでhttp://192.168.55.115:8080/にアクセスして、サンプルDAGのdag_example1をPause状態のOffからOnに変更します。5分程度待つとジョブが実行されます。
○関連情報
・Apache Airflowに関する他の記事はこちらを参照してください。
0 件のコメント:
コメントを投稿