2018年8月17日金曜日

VagrantでApache AirflowとPostgreSQLをインストールした仮想マシン(Debian Stretch/9.5)を構築する

Apache AirflowはPython言語のタスクスケジューラです。

〇Apache Airflowの画面


〇構築方法
1.以下のVagrantfileを使用して、 Apache AirflowとPostgreSQLをインストールした仮想マシン(Debian Stretch/9.5)を構築します。

Vagrantfile
VAGRANTFILE_API_VERSION = "2"

Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
  config.vm.box = "bento/debian-9.5"
  config.vm.hostname = "db95airflowpg"
  config.vm.provider :virtualbox do |vbox|
     vbox.name = "db95airflowpg"
     vbox.cpus = 4
     vbox.memory = 4096
     vbox.customize ["modifyvm", :id, "--nicpromisc2","allow-all"]
  end
config.vm.network "private_network", ip: "192.168.55.115", :netmask => "255.255.255.0"
config.vm.network "public_network", ip:"192.168.1.115", :netmask => "255.255.255.0"
  config.vm.provision "shell", inline: <<-SHELL
apt-get -y install task-japanese
sed -i -e 's/# ja_JP.UTF-8 UTF-8/ja_JP.UTF-8 UTF-8/' /etc/locale.gen
locale-gen
update-locale LANG=ja_JP.UTF-8
localectl set-locale LANG=ja_JP.UTF-8
localectl set-keymap jp106
timedatectl set-timezone Asia/Tokyo
apt-get update
DEBIAN_FRONTEND=noninteractive apt-get -y -o Dpkg::Options::="--force-confdef" -o Dpkg::Options::="--force-confold" upgrade

# install postgresql
apt-get -y install postgresql-9.6
echo "listen_addresses='*'" >> /etc/postgresql/9.6/main/postgresql.conf

#sed -i 's/host.*all.*all.*127.0.0.1/#host    all             all             127.0.0.1/g' /etc/postgresql/9.6/main/pg_hba.conf

echo "host    all         all         127.0.0.1/32          password" >> /etc/postgresql/9.6/main/pg_hba.conf
echo "host    all         all         192.168.1.0/24          password" >> /etc/postgresql/9.6/main/pg_hba.conf
echo "host    all         all         192.168.55.0/24          password" >> /etc/postgresql/9.6/main/pg_hba.conf

su - postgres << EOF
createdb -T template0 --locale=ja_JP.UTF-8 --encoding=UTF8 airflow
psql -c "
alter user postgres with password 'postgres';
create user airflow with password 'airflow';
grant all privileges on database airflow to airflow;
"
EOF
echo "postgres:postgres" | chpasswd
systemctl restart postgresql.service

# install psycopg2
apt-get -y install default-libmysqlclient-dev python3-dev build-essential
apt-get -y install libpq-dev python-dev
apt-get -y install redis-server

# install pipenv
apt-get -y install python3.5
apt-get -y install python-pip
#pip install --upgrade pip
pip install pip==9.0.1
pip install --upgrade setuptools
pip install pipenv


# install airflow.
groupadd airflow
useradd -g airflow -s /bin/bash -d /home/airflow -m airflow
mkdir -p /opt/airflow/dags
chown -R airflow:airflow /opt/airflow
sudo -u airflow /bin/bash << AF_EOF
export AIRFLOW_HOME=/opt/airflow
cd /opt/airflow
pipenv --python 3.5
pipenv install
pipenv run python -V
pipenv install redis
pipenv install apache-airflow[devel,postgres,celery,cryptography]==1.9.0


wget https://raw.githubusercontent.com/apache/incubator-airflow/master/airflow/config_templates/default_airflow.cfg
cp default_airflow.cfg airflow.cfg

sed -i -e 's#sql_alchemy_conn = sqlite:///\{AIRFLOW_HOME\}/airflow.db#sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost:5432/airflow#'  airflow.cfg
sed -i -e 's/executor = SequentialExecutor/executor = CeleryExecutor/' airflow.cfg
sed -i -e 's#\{AIRFLOW_HOME\}#/opt/airflow#' airflow.cfg
sed -i -e 's#broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow#broker_url = redis://localhost:6379#' airflow.cfg
sed -i -e 's#result_backend = db+mysql://airflow:airflow@localhost:3306/airflow#celery_result_backend = db+postgresql://airflow:airflow@localhost/airflow#' airflow.cfg
sed -i -e 's#load_examples = True#load_examples = False#' airflow.cfg
sed -i -e 's#default_timezone = utc#default_timezone = Asia/Tokyo#' airflow.cfg

cat << EOF > gen.py
from cryptography.fernet import Fernet
fernet_key= Fernet.generate_key()
print(fernet_key.decode('utf-8'), end='"')
EOF
pipenv run python gen.py > /tmp/fernet_key
echo -n 'export FERNET_KEY="' > /tmp/setfernetkey
cat /tmp/fernet_key >> /tmp/setfernetkey
AF_EOF

source /tmp/setfernetkey
sed -i -e "s/{FERNET_KEY}/$FERNET_KEY/" /opt/airflow/airflow.cfg

sudo -u airflow /bin/bash << AF_EOF
export AIRFLOW_HOME=/opt/airflow
cd /opt/airflow
pipenv run airflow initdb

# prepare a sample dag
cat << EOF > /opt/airflow/dags/dag_example1.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

operator_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 7, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    dag_id='dag_example1',
    default_args=operator_args,
    catchup=False,
    schedule_interval='0,5,10,15,20,25,30,35,40,45,50,55 * * * *'
)

task1 = BashOperator(
    task_id='task1',
    bash_command='echo "task1:"`date` >> /tmp/test.log',
    dag=dag
)

task2 = BashOperator(
    task_id='task2',
    bash_command='sleep 5 && echo "task2:"`date` >> /tmp/test.log',
    dag=dag
)

task1 >> task2
EOF
AF_EOF

mkdir -p /run/airflow
chown airflow:airflow /run/airflow

# setup worker service
cat << EOF > /etc/systemd/system/airflow-worker.service
[Unit]
Description=Airflow worker daemon
Requires=network.target

[Service]
User=airflow
Group=airflow
Type=simple
WorkingDirectory=/opt/airflow
Environment=AIRFLOW_HOME=/opt/airflow
ExecStart=/usr/local/bin/pipenv run airflow worker --pid /run/airflow/worker.pid
Restart=on-failure
RestartSec=30s

[Install]
WantedBy=multi-user.target
EOF
systemctl enable airflow-worker.service
systemctl start airflow-worker.service

# setup scheduler service
cat << EOF > /etc/systemd/system/airflow-scheduler.service
[Unit]
Description=Airflow scheduler daemon
Requires=network.target

[Service]
User=airflow
Group=airflow
Type=simple
WorkingDirectory=/opt/airflow
Environment=AIRFLOW_HOME=/opt/airflow
ExecStart=/usr/local/bin/pipenv run airflow scheduler --pid /run/airflow/scheduler.pid
Restart=on-failure
RestartSec=30s

[Install]
WantedBy=multi-user.target
EOF
systemctl enable airflow-scheduler.service
systemctl start airflow-scheduler.service

cat << EOF > /etc/systemd/system/airflow-webserver.service
[Unit]
Description=Airflow webserver daemon
Requires=network.target

[Service]
User=airflow
Group=airflow
Type=simple
WorkingDirectory=/opt/airflow
Environment=AIRFLOW_HOME=/opt/airflow
ExecStart=/usr/local/bin/pipenv run airflow webserver --pid /run/airflow/webserver.pid
Restart=on-failure
RestartSec=30s

[Install]
WantedBy=multi-user.target
EOF
systemctl enable airflow-webserver.service
systemctl start airflow-webserver.service

echo 'url  -> http://192.168.55.115:8080/'
SHELL
end

2. ブラウザでhttp://192.168.55.115:8080/にアクセスして、サンプルDAGのdag_example1をPause状態のOffからOnに変更します。5分程度待つとジョブが実行されます。

○関連情報
・Apache Airflowに関する他の記事はこちらを参照してください。

・psycopg2に関する他の記事はこちらを参照してください。

0 件のコメント:

コメントを投稿