2018年7月22日日曜日

VagrantでApache AirflowとMySQLをインストールした仮想マシン(CentOS7.5)を構築する

Apache AirflowはPython言語のタスクスケジューラです。

〇Apache Airflowの画面


〇構築方法
1.以下のVagrantfileを使用して、 Apache Airflowをインストールした仮想マシン(CentOS7.5)を構築します。

Vagrantfile
VAGRANTFILE_API_VERSION = "2"

Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
  config.vm.box = "bento/centos-7.5"
  config.vm.hostname = "co75airflow"
  config.vm.provider :virtualbox do |vbox|
     vbox.name = "co75airflow"
     vbox.cpus = 4
     vbox.memory = 4096
     vbox.customize ["modifyvm", :id, "--nicpromisc2","allow-all"]
  end
config.vm.network "private_network", ip: "192.168.55.115", :netmask => "255.255.255.0"
config.vm.network "public_network", ip:"192.168.1.115", :netmask => "255.255.255.0"
  config.vm.provision "shell", inline: <<-SHELL
localectl set-locale LANG=ja_JP.UTF-8
timedatectl set-timezone Asia/Tokyo

# install mysql
sudo yum -y remove mariadb-libs
yum -y localinstall http://dev.mysql.com/get/mysql57-community-release-el7-7.noarch.rpm
yum -y install yum-utils
yum -y install mysql mysql-server
sudo systemctl enable mysqld.service
sudo systemctl start mysqld.service
# change password and create users and databases.
chkconfig mysqld on
service mysqld start
export MYSQL_ROOTPWD='Root123#'
export MYSQL_PWD=`cat /var/log/mysqld.log | awk '/temporary password/ {print $NF}'`
mysql -uroot --connect-expired-password -e "SET PASSWORD = PASSWORD('Root123#'); FLUSH PRIVILEGES;"
mysql -uroot -pRoot123# -e "UNINSTALL PLUGIN validate_password;"
mysql -uroot -pRoot123# -e "SET PASSWORD = PASSWORD('root'); FLUSH PRIVILEGES;"
mysql -uroot -proot -e "CREATE DATABASE airflow DEFAULT CHARACTER SET utf8;"

mysql -uroot -proot -e "CREATE USER airflow@localhost IDENTIFIED BY 'airflow';"
mysql -uroot -proot -e "GRANT ALL PRIVILEGES ON airflow.* TO 'airflow'@'localhost';"
mysql -uroot -proot -e "FLUSH PRIVILEGES;"

yum -y install epel-release
yum update
yum -y install python36 python-devel python36-devel gcc-c++ openldap-devel openssl-devel mysql-devel
yum -y install redis
systemctl enable redis
systemctl start redis

# install pipenv
yum -y install python-pip
#pip install --upgrade pip
pip install pip==9.0.1
pip install --upgrade setuptools
pip install pipenv


# install airflow.
groupadd airflow
useradd -g airflow -s /bin/bash -d /home/airflow -m airflow

mkdir -p /opt/airflow/dags
chown -R airflow:airflow /opt/airflow
sudo -u airflow /bin/bash << AF_EOF
export AIRFLOW_HOME=/opt/airflow
export HOME=/home/airflow
cd /opt/airflow
echo $HOME
pipenv --python 3.6
pipenv install
pipenv run python -V
pipenv install redis
pipenv install apache-airflow[devel,mysql,celery,cryptography]==1.9.0

wget https://raw.githubusercontent.com/apache/incubator-airflow/master/airflow/config_templates/default_airflow.cfg
cp default_airflow.cfg airflow.cfg

sed -i -e 's#sql_alchemy_conn = sqlite:///\{AIRFLOW_HOME\}/airflow.db#sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow#'  airflow.cfg
sed -i -e 's/executor = SequentialExecutor/executor = CeleryExecutor/' airflow.cfg
sed -i -e 's#\{AIRFLOW_HOME\}#/opt/airflow#' airflow.cfg
sed -i -e 's#broker_url = sqla\+mysql://airflow:airflow@localhost:3306/airflow#broker_url = redis://localhost:6379#' airflow.cfg
sed -i -e 's#load_examples = True#load_examples = False#' airflow.cfg
sed -i -e 's#default_timezone = utc#default_timezone = Asia/Tokyo#' airflow.cfg

cat << EOF > gen.py
from cryptography.fernet import Fernet
fernet_key= Fernet.generate_key()
print(fernet_key.decode('utf-8'), end='"')
EOF
pipenv run python gen.py > /tmp/fernet_key
echo -n 'export FERNET_KEY="' > /tmp/setfernetkey
cat /tmp/fernet_key >> /tmp/setfernetkey
AF_EOF

source /tmp/setfernetkey
sed -i -e "s/{FERNET_KEY}/$FERNET_KEY/" /opt/airflow/airflow.cfg

sudo -u airflow /bin/bash << AF_EOF
export AIRFLOW_HOME=/opt/airflow
export HOME=/home/airflow
cd /opt/airflow
pipenv run airflow initdb

# prepare a sample dag
cat << EOF > /opt/airflow/dags/dag_example1.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

operator_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 7, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    dag_id='dag_example1',
    default_args=operator_args,
    catchup=False,
    schedule_interval='0,5,10,15,20,25,30,35,40,45,50,55 * * * *'
)

task1 = BashOperator(
    task_id='task1',
    bash_command='echo "task1:"`date` >> /tmp/test.log',
    dag=dag
)

task2 = BashOperator(
    task_id='task2',
    bash_command='sleep 5 && echo "task2:"`date` >> /tmp/test.log',
    dag=dag
)

task1 >> task2
EOF
AF_EOF

mkdir -p /run/airflow
chown airflow:airflow /run/airflow

# setup worker service
cat << EOF > /etc/systemd/system/airflow-worker.service
[Unit]
Description=Airflow worker daemon
Requires=network.target

[Service]
User=airflow
Group=airflow
Type=simple
WorkingDirectory=/opt/airflow
Environment=AIRFLOW_HOME=/opt/airflow
ExecStart=/bin/pipenv run airflow worker --pid /run/airflow/worker.pid
Restart=on-failure
RestartSec=30s

[Install]
WantedBy=multi-user.target
EOF
systemctl enable airflow-worker.service
systemctl start airflow-worker.service

# setup scheduler service
cat << EOF > /etc/systemd/system/airflow-scheduler.service
[Unit]
Description=Airflow scheduler daemon
Requires=network.target

[Service]
User=airflow
Group=airflow
Type=simple
WorkingDirectory=/opt/airflow
Environment=AIRFLOW_HOME=/opt/airflow
ExecStart=/bin/pipenv run airflow scheduler --pid /run/airflow/scheduler.pid
Restart=on-failure
RestartSec=30s

[Install]
WantedBy=multi-user.target
EOF
systemctl enable airflow-scheduler.service
systemctl start airflow-scheduler.service

cat << EOF > /etc/systemd/system/airflow-webserver.service
[Unit]
Description=Airflow webserver daemon
Requires=network.target

[Service]
User=airflow
Group=airflow
Type=simple
WorkingDirectory=/opt/airflow
Environment=AIRFLOW_HOME=/opt/airflow
ExecStart=/bin/pipenv run airflow webserver --pid /run/airflow/webserver.pid
Restart=on-failure
RestartSec=30s

[Install]
WantedBy=multi-user.target
EOF
systemctl enable airflow-webserver.service
systemctl start airflow-webserver.service

echo 'url  -> http://192.168.55.115:8080/'
SHELL
end

2. ブラウザでhttp://192.168.55.115:8080/にアクセスして、サンプルDAGのdag_example1をPause状態のOffからOnに変更します。5分程度待つとジョブが実行されます。

○関連情報
・Apache Airflowに関する他の記事はこちらを参照してください。

0 件のコメント:

コメントを投稿