〇Apache Airflowの画面
〇構築方法
1.以下のVagrantfileを使用して、 Apache AirflowとPostgreSQLをインストールした仮想マシン(CentOS7.5)を構築します。
Vagrantfile
VAGRANTFILE_API_VERSION = "2"
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
config.vm.box = "bento/centos-7.5"
config.vm.hostname = "co75airflowpg"
config.vm.provider :virtualbox do |vbox|
vbox.name = "co75airflowpg"
vbox.cpus = 4
vbox.memory = 4096
vbox.customize ["modifyvm", :id, "--nicpromisc2","allow-all"]
end
config.vm.network "private_network", ip: "192.168.55.115", :netmask => "255.255.255.0"
config.vm.network "public_network", ip:"192.168.1.115", :netmask => "255.255.255.0"
config.vm.provision "shell", inline: <<-SHELL
localectl set-locale LANG=ja_JP.UTF-8
timedatectl set-timezone Asia/Tokyo
# download and install postgresql.
wget https://download.postgresql.org/pub/repos/yum/10/redhat/rhel-7-x86_64/pgdg-centos10-10-2.noarch.rpm
rpm -Uvh pgdg-centos10-10-2.noarch.rpm
yum -y update
yum -y install postgresql10-server postgresql10-devel postgresql10-contrib
systemctl enable postgresql-10
# initialize postgresql server
/usr/pgsql-10/bin/postgresql-10-setup initdb
echo "listen_addresses='*'" >> /var/lib/pgsql/10/data/postgresql.conf
sed -i 's/host.*all.*all.*127.0.0.1/#host all all 127.0.0.1/g' /var/lib/pgsql/10/data/pg_hba.conf
sed -i 's#^host.*all.*all.*::1/128.*ident#host all all ::1/128 password#g' /var/lib/pgsql/10/data/pg_hba.conf
echo "host all all 127.0.0.1/32 password" >> /var/lib/pgsql/10/data/pg_hba.conf
echo "host all all 192.168.1.0/24 password" >> /var/lib/pgsql/10/data/pg_hba.conf
echo "host all all 192.168.55.0/24 password" >> /var/lib/pgsql/10/data/pg_hba.conf
systemctl start postgresql-10.service
# create users and databases...
su - postgres << EOF
psql -c "
alter user postgres with password 'postgres';
"
createdb -T template0 --locale=ja_JP.UTF-8 --encoding=UTF8 airflow
EOF
export PGPASSWD=postgres
systemctl restart postgresql-10.service
su - postgres << EOF
psql -c "
create user airflow with password 'airflow';
alter database airflow owner to airflow;
"
EOF
yum -y install epel-release
yum update
yum -y install postgresql-devel
yum -y install python36 python-devel python36-devel gcc-c++ openldap-devel openssl-devel mysql-devel
yum -y install redis
systemctl enable redis
systemctl start redis
# install pipenv
yum -y install python-pip
#pip install --upgrade pip
pip install pip==9.0.1
pip install --upgrade setuptools
pip install pipenv
# install airflow.
groupadd airflow
useradd -g airflow -s /bin/bash -d /home/airflow -m airflow
mkdir -p /opt/airflow/dags
chown -R airflow:airflow /opt/airflow
sudo -u airflow /bin/bash << AF_EOF
export AIRFLOW_HOME=/opt/airflow
export HOME=/home/airflow
cd /opt/airflow
echo $HOME
pipenv --python 3.6
pipenv install
pipenv run python -V
pipenv install psycopg2
pipenv install redis
pipenv install apache-airflow[devel,mysql,celery,cryptography]==1.9.0
wget https://raw.githubusercontent.com/apache/incubator-airflow/master/airflow/config_templates/default_airflow.cfg
cp default_airflow.cfg airflow.cfg
sed -i -e 's#sql_alchemy_conn = sqlite:///\{AIRFLOW_HOME\}/airflow.db#sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost:5432/airflow#' airflow.cfg
sed -i -e 's/executor = SequentialExecutor/executor = CeleryExecutor/' airflow.cfg
sed -i -e 's#\{AIRFLOW_HOME\}#/opt/airflow#' airflow.cfg
sed -i -e 's#broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow#broker_url = redis://localhost:6379#' airflow.cfg
sed -i -e 's#result_backend = db+mysql://airflow:airflow@localhost:3306/airflow#celery_result_backend = db+postgresql://airflow:airflow@localhost/airflow#' airflow.cfg
sed -i -e 's#load_examples = True#load_examples = False#' airflow.cfg
sed -i -e 's#default_timezone = utc#default_timezone = Asia/Tokyo#' airflow.cfg
cat << EOF > gen.py
from cryptography.fernet import Fernet
fernet_key= Fernet.generate_key()
print(fernet_key.decode('utf-8'), end='"')
EOF
pipenv run python gen.py > /tmp/fernet_key
echo -n 'export FERNET_KEY="' > /tmp/setfernetkey
cat /tmp/fernet_key >> /tmp/setfernetkey
AF_EOF
source /tmp/setfernetkey
sed -i -e "s/{FERNET_KEY}/$FERNET_KEY/" /opt/airflow/airflow.cfg
sudo -u airflow /bin/bash << AF_EOF
export AIRFLOW_HOME=/opt/airflow
export HOME=/home/airflow
cd /opt/airflow
pipenv run airflow initdb
# prepare a sample dag
cat << EOF > /opt/airflow/dags/dag_example1.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
operator_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 7, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
dag_id='dag_example1',
default_args=operator_args,
catchup=False,
schedule_interval='0,5,10,15,20,25,30,35,40,45,50,55 * * * *'
)
task1 = BashOperator(
task_id='task1',
bash_command='echo "task1:"`date` >> /tmp/test.log',
dag=dag
)
task2 = BashOperator(
task_id='task2',
bash_command='sleep 5 && echo "task2:"`date` >> /tmp/test.log',
dag=dag
)
task1 >> task2
EOF
AF_EOF
mkdir -p /run/airflow
chown airflow:airflow /run/airflow
# setup worker service
cat << EOF > /etc/systemd/system/airflow-worker.service
[Unit]
Description=Airflow worker daemon
Requires=network.target
[Service]
User=airflow
Group=airflow
Type=simple
WorkingDirectory=/opt/airflow
Environment=AIRFLOW_HOME=/opt/airflow
ExecStart=/bin/pipenv run airflow worker --pid /run/airflow/worker.pid
Restart=on-failure
RestartSec=30s
[Install]
WantedBy=multi-user.target
EOF
systemctl enable airflow-worker.service
systemctl start airflow-worker.service
# setup scheduler service
cat << EOF > /etc/systemd/system/airflow-scheduler.service
[Unit]
Description=Airflow scheduler daemon
Requires=network.target
[Service]
User=airflow
Group=airflow
Type=simple
WorkingDirectory=/opt/airflow
Environment=AIRFLOW_HOME=/opt/airflow
ExecStart=/bin/pipenv run airflow scheduler --pid /run/airflow/scheduler.pid
Restart=on-failure
RestartSec=30s
[Install]
WantedBy=multi-user.target
EOF
systemctl enable airflow-scheduler.service
systemctl start airflow-scheduler.service
cat << EOF > /etc/systemd/system/airflow-webserver.service
[Unit]
Description=Airflow webserver daemon
Requires=network.target
[Service]
User=airflow
Group=airflow
Type=simple
WorkingDirectory=/opt/airflow
Environment=AIRFLOW_HOME=/opt/airflow
ExecStart=/bin/pipenv run airflow webserver --pid /run/airflow/webserver.pid
Restart=on-failure
RestartSec=30s
[Install]
WantedBy=multi-user.target
EOF
systemctl enable airflow-webserver.service
systemctl start airflow-webserver.service
echo 'url -> http://192.168.55.115:8080/'
SHELL
end
2. ブラウザでhttp://192.168.55.115:8080/にアクセスして、サンプルDAGのdag_example1をPause状態のOffからOnに変更します。5分程度待つとジョブが実行されます。
○関連情報
・Apache Airflowに関する他の記事はこちらを参照してください。
・psycopg2に関する他の記事はこちらを参照してください。
0 件のコメント:
コメントを投稿