Kubernetes Cluster on Raspberry Pi zero using Cluster Hat

Once I've read this blog(3日間クッキング【Kubernetes のラズペリーパイ包み “サイバーエージェント風”】), I wanted to deploy k8s cluster on Raspberry Pi.
However, It's too expensive for me in this blog's way. Need three Raspberry Pi 3 Model B and switching hub ... etc.
So I've created k8s cluster Raspberry Pi Zero, but It was very hard for me than I thought.
I'll introduce the creation of k8s cluster on raspberry pi zero in this time.
And this is totally useless because of using the older version, and not for production. In addition unstable behavior and very slowly.
I made only for my pleasure. If you're interested in, make it.

f:id:cloudfish:20190827155059j:plain

Component List

Prepare the following components.
We need at least one Raspberry Pi Zero

  • Cluster Hat v2.3 x 1 (23.33£)
  • Raspberry Pi B+ x 1 ($35)
  • Raspberry Pi Zero x 1-4 (3.88£/per)
  • Micro SD Card x 2-5
  • Power Cable x 1
  • Lan Cable or USB Wifi Adaptor

※You can buy Cluster Hat and Raspberry Pi in Pimoroni(https://shop.pimoroni.com/)

Prerequisite

Component Role

Hardware Role of ClusterHat Role of K8S
Raspberry Pi B+ Controller Control-Plane
Raspberry Pi Zero x (1 - 4) Node Node
ClusterHat - -

Raspberry Pi Zero( or Zero W)
This is my environment(actually, I used 3 raspberry pi zero)
f:id:cloudfish:20190827173701p:plain

1. Create Raspberry Pi Zero Image for ClusterHat and k8s

1-1. Create old raspbian image

Create Raspberry Pi Zero image from 2017-11-29-raspbian-stretch-lite because latest raspbian image can unavailable cpuset.
execute the following command in Controller Node of ClusterHat

# wget http://ftp.jaist.ac.jp/pub/raspberrypi/raspbian_lite/images/raspbian_lite-2017-12-01/2017-11-29-raspbian-stretch-lite.zip
# unzip 2017-11-29-raspbian-stretch-lite.zip
# apt install kpartx
# git clone https://github.com/burtyb/clusterhat-image
# cd clusterhat-image/build
# mkdir {img,mnt,dest,mnt2}
# mv 2017-11-29-raspbian-stretch-lite.img img/
# create.sh 2017-11-29

if it is well, the Raspbian image should be in dest directory.

ClusterCTRL-2017-11-29-lite-1-CNAT.img
ClusterCTRL-2017-11-29-lite-1-p1.img 
ClusterCTRL-2017-11-29-lite-1-p2.img
ClusterCTRL-2017-11-29-lite-1-p3.img
ClusterCTRL-2017-11-29-lite-1-p4.img

1-2. Write the image to micro sd

Download this Raspbian image to local.
Write CNAT.img to Controller Node(Raspberry Pi B+), others to the node(Raspberry Pi Zero).
Create ssh file at boot directory for connecting ssh each node.
If you're on a Mac, execute the following command after writing.

# touch /Volumes/boot/ssh

1-3. Start Controller Node

Confirm connecting to Controller Node
From local machine.

# ssh pi@[controller ip_addr]

From Controller Node

# ssh pi@[172.19.181.1-4]

Password is raspberry

2. Setting Node of ClusterHat

2-1. Install library to Controller Node

Install python-smbus because lacked library for clusterhat command. Execute the following command.

# apt-get install python-smbus


After install, execute clusterhat command.

# clusterhat status

2-2. Setting Nat

For Node connect to the internet, Setting Nat.
Execute the following command

# apt-get -y install iptables-persistent
# iptables -t nat -A POSTROUTING -s 172.19.181.0/24 ! -o brint -j MASQUERADE
# iptables -A FORWARD -i brint ! -o brint -j ACCEPT
# iptables -A FORWARD -o brint -m conntrack --ctstate  RELATED,ESTABLISHED -j ACCEPT
# sh -c "iptables-save > /etc/iptables/rules.v4"

3. Setting All Node of ClusterHat

3-1. Hold kernel version

# apt-mark hold raspberrypi-kernel

3-2. upgrade

# apt-get upgrade -s

Check it has not changed kernel version

# uname -a
Linux p1 4.9.59+ #1047 Sun Oct 29 11:47:10 GMT 2017 armv6l GNU/Linux

3-3. Install Docker

Install with fixed version(18.06.1~ce~3-0~raspbian)

# curl -sSL https://get.docker.com | sh
# usermod -aG docker pi
# apt-get install docker-ce=18.06.1~ce~3-0~raspbian

If docker command not well, reinstall docker.

# apt-get remove docker-ce
# apt-get install docker-ce=18.06.1~ce~3-0~raspbian

3-4. Hold Docker version

# apt-mark hold docker-ce

3-5. Install Kubernetes

Install kubernetes with below version.

# curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - 
# echo "deb http://apt.kubernetes.io/ kubernetes-xenial main" > /etc/apt/sources.list.d/kubernetes.list
# apt-get update
# apt-cache madison kubeadm
# apt-get install kubelet=1.5.6-00 kubeadm=1.5.6-00 kubectl=1.5.6-00 kubernetes-cni=0.5.1-00

4. Setting the Control-Plane Node(Controller Node of ClusterHat)

4-1. Initialize a Kubernetes Control-Plane node

Execute the following command on control-plane

# kubeadm init --pod-network-cidr 10.244.0.0/16
Probably it should stop at the following log (in 5-10minitus)
<master/apiclient> created API client, waiting for the control plane to become ready
Check the logs,you should see the following logs
# journalctl -eu kubelet
"error: failed to run Kubelet: unable to load client CA file /etc/kubernetes/pki/ca.crt: open /etc/kubernetes/pki/ca.crt"

※because creating ca(ca.pem) file name and refference ca(ca.crt) file name are different

Solution of this trouble

Change file name as follows

cd /etc/kubernetes/pki
cp ca.pem ca.crt

After change file name, waiting for a while.
if it well, memo the following such a output.

# kubeadm join --token=18b644.0438585f9e20e864 192.168.0.109

4-2. Install flannel

Create file as follows
kube-flannel.yml

---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: flannel
  namespace: kube-system
---
kind: ConfigMap
apiVersion: v1
metadata:
  name: kube-flannel-cfg
  namespace: kube-system
  labels:
    tier: node
    app: flannel
data:
  cni-conf.json: |
    {
      "name": "cbr0",
      "type": "flannel",
      "delegate": {
        "isDefaultGateway": true
      }
    }
  net-conf.json: |
    {
      "Network": "10.244.0.0/16",
      "Backend": {
        "Type": "vxlan"
      }
    }
---
apiVersion: extensions/v1beta1
kind: DaemonSet
metadata:
  name: kube-flannel-ds
  namespace: kube-system
  labels:
    tier: node
    app: flannel
spec:
  template:
    metadata:
      labels:
        tier: node
        app: flannel
    spec:
      hostNetwork: true
      nodeSelector:
        beta.kubernetes.io/arch: arm
      tolerations:
      - key: node-role.kubernetes.io/master
        operator: Exists
        effect: NoSchedule
      serviceAccountName: flannel
      containers:
      - name: kube-flannel
        image: quay.io/coreos/flannel:v0.7.1-arm
        command: [ "/opt/bin/flanneld", "--ip-masq", "--kube-subnet-mgr" ]
        securityContext:
          privileged: true
        env:
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: POD_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        volumeMounts:
        - name: run
          mountPath: /run
        - name: flannel-cfg
          mountPath: /etc/kube-flannel/
      - name: install-cni
        image: quay.io/coreos/flannel:v0.7.1-arm
        command: [ "/bin/sh", "-c", "set -e -x; cp -f /etc/kube-flannel/cni-conf.json /etc/cni/net.d/10-flannel.conf; while true; do sleep 3600; done" ]
        volumeMounts:
        - name: cni
          mountPath: /etc/cni/net.d
        - name: flannel-cfg
          mountPath: /etc/kube-flannel/
      volumes:
        - name: run
          hostPath:
            path: /run
        - name: cni
          hostPath:
            path: /etc/cni/net.d
        - name: flannel-cfg
          configMap:
            name: kube-flannel-cfg

Execute the following command.

#kubectl create --validate=false -f kube-flannel.yml

※ it takes a long time(about 30〜40minutes)

5. Setting the Kubernetes Node(Node of ClusterHat)

5-1. Copy ca.crt from Control-Plane

Download ca.crt from Control-Plane, and Upload the file to node.

scp pi@[control-plane ip_addr]:/etc/kubernetes/pki/ca.crt .
scp pi@[node ip_addr]:ca.crt /etc/kubernetes/pki/ca.crt

because when executed join command, not created ca.crt(probably bug)
if you execute join command without this 5-1, you should face the following trouble.

after executing join command, the following output.

Node joins complete:

but node not joined.

Check the log.

# journalctl -eu kubelet
"error: failed to run Kubelet: unable to load client CA file /etc/kubernetes/pki/ca.crt"

5-2. Join to Control-Plane

Execute the following command(at 4-1's command) each node.

# kubeadm join --token=18b644.0438585f9e20e864 192.168.0.109


6. Check if the installation was successful
If you can see the following output, you got success.

# kubectl get nodes
NAME      STATUS         AGE
cnat      Ready,master   2h
p1        Ready          1m
p2        Ready          9m
p3        Ready          19m

and look this command output

# kubectl get pods --all-namespaces -o wide
NAMESPACE     NAME                              READY     STATUS    RESTARTS   AGE       IP              NODE
kube-system   dummy-2501624643-8jrkj            1/1       Running   0          2h        192.168.0.109   cnat
kube-system   etcd-cnat                         1/1       Running   0          2h        192.168.0.109   cnat
kube-system   kube-apiserver-cnat               1/1       Running   0          2h        192.168.0.109   cnat
kube-system   kube-controller-manager-cnat      1/1       Running   0          2h        192.168.0.109   cnat
kube-system   kube-discovery-2202902116-pwwaf   1/1       Running   0          2h        192.168.0.109   cnat
kube-system   kube-dns-2334855451-aq9pj         3/3       Running   1          2h        10.244.0.2      cnat
kube-system   kube-flannel-ds-5tz4s             2/2       Running   2          22m       172.19.181.3    p3
kube-system   kube-flannel-ds-k5vsc             2/2       Running   0          3m        172.19.181.1    p1
kube-system   kube-flannel-ds-kasde             2/2       Running   0          1h        192.168.0.109   cnat
kube-system   kube-flannel-ds-uumv1             2/2       Running   2          11m       172.19.181.2    p2
kube-system   kube-proxy-2ku07                  1/1       Running   0          2h        192.168.0.109   cnat
kube-system   kube-proxy-d9xxo                  1/1       Running   0          22m       172.19.181.3    p3
kube-system   kube-proxy-dz146                  1/1       Running   0          11m       172.19.181.2    p2
kube-system   kube-proxy-gvcs1                  1/1       Running   0          3m        172.19.181.1    p1
kube-system   kube-scheduler-cnat               1/1       Running   0          2h        192.168.0.109   cnat

What kind of trouble occurred

1. Unavailable cpuset on latest raspbian image (1-1)

created old raspbian image with being able to use cpuset.

2. Can't initialize kubernetes on kubeadm(4-1)

kubeadm created ca(ca.pem) file, also kubeadm reference ca(ca.crt) file, but both file name is not same.
So Renamed the ca.pem file to the ca.crt.

3. Can't deploy the latest flannel(4-2)

The latest flannel can unavailable in this version of kubeadm.
I used an old flannel version.

4. Node can't join to Control-Plane(5-1)

When execute join command, node can't join to Control-Node.
The cause of this problem is the node doesn't have ca.crt. Probably should be created automatically when joining.
This can be solved by copying the ca.crt from Control-Plane to Node

AWS Client VPN設定のハマりどころ

AWS Client VPNが東京リージョンでも使えるようになりましたね。
これでVPNサーバを立てる手間が減らせるので、費用面は考慮しつつうまく活用していきたいですね。
今回はClient VPNを検証した際に設定でわかりにくかった箇所があったので備忘録として残しておきたいと思います。

Client VPN設定方法

設定方法については、AWSのドキュメントやブログで色々出ていますのでそれを参照して設定してください。
クライアント VPN の使用開始 - AWS Client VPN
[AWS]踏み台をワンチャンなくせる!?VPC接続にClient VPNを使ってみよう | DevelopersIO

構成

以下の構成で検証しました
f:id:cloudfish:20190725123039p:plain

ハマったポイント

設定後にClient端末からSSHが通らない

VPNは正常に接続できるものの、SSH接続ができない状態となりました。
ルーティングについてはドキュメント通りに設定しており、NACLについては特に設定していませんでした。
セキュリティグループについては、VPCE、EC2のSGそれぞれで以下のとおりClientのCIDR範囲を許可していました。
■SG_VPCE

Port Source
22 192.168.0.0/16

■SG_EC2

Port Source
22 192.168.0.0/16

設定に問題がないか再確認しつつ、切り分けのためにいったんそれぞれのSGを0.0.0.0でフル解放してSSH接続してみました。
結果は接続が成功し、secureログで接続元のIPを確認すると、172.31.16.xxxとなっておりClientVPNのVPCEndpointのIPであることが分かりました。
Client VPNでの接続については、Client端末の接続がVPCEndpointのIPでNatされるようなので、Client端末のIPをSGで許可しても意味がありませんでした。

上記を踏まえて、以下のようにSGを設定しました。
■SG_VPCE

Port Source
22 192.168.0.0/16

■SG_EC2

Port Source
22 SG_VPCE

Client端末からインターネット接続できない

SSH接続ができない問題を対応中に、Client端末からインターネット接続をしようとしましたがドキュメントどおりの設定ではインターネット接続ができないようになっていました。
ルーティング設定かと考え、0.0.0.0/0を追加しましたが接続できませんでした。
もう少し調べたところ、承認設定に0.0.0.0/0を追加が必要になることがわかったので、設定したところVPN接続しながらインターネット接続ができるようになりました。

まとめ

AWS ClientVPNはサーバをきどうする必要もないため手軽に起動できますが、承認設定のような概念が分かりづらい設定もあり少し引っかかりました。トータルではサーバの面倒を見る必要がないという大きなアドバンテージがあるので積極的に活用していきたいと思います。

CO2濃度計をRaspberry Pi Zeroで作ってみた

車の運転中にCO2濃度が高くなると集中力が落ち眠くなりやすくなるそうです。
そこで、車でCO2濃度が測定できるようにRaspberry Piを使って車載用にCO2濃度計を作ることにしました。
まだやりたいことが全部できたわけではないですが、個人的に使うには十分なところまでできましたので作成方法について紹介したいと思います。
f:id:cloudfish:20190529193915j:plain

機能

最終的に実装した機能は以下になります。

  • 定期的にCO2濃度を測定し小型液晶に表示
  • CO2濃度が閾値を超えていたら警告音を鳴らす
  • webで濃度の推移グラフ(1分ごと、1時間ごと)を可視化する
  • CO2濃度のログをAWSのS3に送信(とりあえず蓄積するだけ)

構成

f:id:cloudfish:20190611155546p:plain

必要部品

部品 数量 用途 金額(目安)
Raspberry Pi Zero W 1   1300円
MH-Z19 1 CO2センサー 3800円
PiOled 1 表示用小型液晶 3800円
圧電スピーカ 1 アラーム用 100円

各部品の接続

接続図

f:id:cloudfish:20190611151425p:plain

上記接続の通り各部品を接続していきます。

CO2センサー(mh-z19)接続

CO2センサーを接続し動作確認を行います。

uartの有効化

/boot/config.txtに以下を追加してraspberry piを再起動する

enable_uart=1
モジュールのインストール
pip install getrpimodel
接続確認

以下のプログラムを適当な名前で保存し実行します。

import sys
import serial
import time
import subprocess
import getrpimodel
import datetime
from time import sleep 
import RPi.GPIO as GPIO

if getrpimodel.model() == "3 Model B":
  serial_dev = '/dev/ttyS0'
  stop_getty = 'sudo systemctl stop serial-getty@ttyS0.service'
  start_getty = 'sudo systemctl start serial-getty@ttyS0.service'
else:
  serial_dev = '/dev/ttyAMA0'
  stop_getty = 'sudo systemctl stop serial-getty@ttyAMA0.service'
  start_getty = 'sudo systemctl start serial-getty@ttyAMA0.service'

def mh_z19():
  ser = serial.Serial(serial_dev,
               baudrate=9600,
               bytesize=serial.EIGHTBITS,
               parity=serial.PARITY_NONE,
               stopbits=serial.STOPBITS_ONE,
               timeout=1.0)

  while 1:
    result=ser.write("\xff\x01\x86\x00\x00\x00\x00\x00\x79")
    s=ser.read(9)
    if len(s)!=0 and  s[0] == "\xff" and s[1] == "\x86":
      return {'co2': ord(s[2])*256 + ord(s[3])}
      break

def main():

   subprocess.call(stop_getty, stdout=subprocess.PIPE, shell=True)
   now = datetime.datetime.now()
   now_ymdhms = "{0:%Y/%m/%d %H:%M:%S}".format(now)

   # Get Data
   value = mh_z19()
   co2 = value["co2"]
   print('CO2:' + co2)

   subprocess.call(start_getty, stdout=subprocess.PIPE, shell=True)

if __name__ == '__main__':
   main()

CO2の値が取得されていれば正しく接続できています。
室内だと通常400〜1000ppm程度かと思いますが、異常な値が取得されていれば以下のサイトを参考に補正してみてください。
qiita.com

液晶(PiOled)接続

PiOledを接続後、以下の設定を行います。
手順はRaspberry Pi 3 Model Bに Adafruit PiOLED を接続 - Qiitaを参考にしました。

カーネルモジュール自動ロード設定

/boot/config.txtに以下を追加してraspberry piを再起動する

dtparam=i2c_arm=on
必要ライブラリのインストール
sudo apt-get install python-dev
sudo apt-get install python-imaging
sudo apt-get install libffi-dev
sudo pip install smbus-cffi
sudo pip install RPi.GPIO 
カーネルモジュールのロード
sudo modprobe i2c-dev
$ dmesg | grep i2c

ロードされていることの確認
cat /proc/devices | grep i2c
 89 i2c

/dev/i2c-1 の作成

sudo mknod /dev/i2c-1 c 89 1
$ ls /dev/i2c-1
/dev/i2c-1
PiOledの接続確認

以下のように出力されていれば正しく接続されています。

sudo i2cdetect -y 1
     0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f
00:          -- -- -- -- -- -- -- -- -- -- -- -- --
10: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
20: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
30: -- -- -- -- -- -- -- -- -- -- -- -- 3c -- -- --
40: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
50: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
60: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
70: -- -- -- -- -- -- -- --
サンプルの実行確認
git clone https://github.com/adafruit/Adafruit_Python_SSD1306.git

cd Adafruit_Python_SSD1306
sudo python ./setup.py build
sudo python ./setup.py install
python ./examples/stats.py &

液晶にIPアドレス、CPU、メモリ、ディスクが表示されていればOKです。

圧電スピーカ接続

接続後に以下のプログラムを実行してスピーカーから音が鳴ればOKです。

import RPi.GPIO as GPIO
import time

SOUNDER = 21

GPIO.setmode(GPIO.BCM)
GPIO.setup(SOUNDER, GPIO.OUT, initial = GPIO.LOW)

p = GPIO.PWM(SOUNDER, 6500)
p.start(50)
time.sleep(0.5)
p.stop()
time.sleep(0.5)

p.stop
GPIO.cleanup()

プログラムの配置

CO2濃度取得プログラムの配置

以下の通りプログラムを取得し適当なところに配置します。(ここでは/home/pi/program/配下に配置しました。)

git clone https://github.com/cloudfish7/co2_sensor.git

cronの設定
1分ごとと1時間ごとにデータを取得するように設定します。

*/1 * * * * sudo python /home/pi/program/co2_sensor/mh-z19.py minutes
0 * * * * sudo python /home/pi/program/co2_sensor/mh-z19.py hour

表示用Webプログラムの配置

以下の通りプログラムを取得し適当なところに配置します。(ここでは/home/pi/program/配下に配置しました。)
グラフ表示についてはOpen Source Image Charts Replacement | QuickChartを利用しているためraspberry piについてはインターネット接続されている必要があります。

git clone https://github.com/cloudfish7/co2_web.git

以下コマンドで実行します。(自動起動設定してないです)

python app_co2.py

ブラウザで以下アドレスにアクセスするとweb画面が表示されます。
IPアドレスは液晶にも表示されるようになっています。

http:ip_address:8080

車載にあたっての問題

車に積んだ際に電源が問題になります。
シガーソケットからUSBで電源を取得していた場合、エンジンを切ると電源供給が止まるためRaspberry Piが突然落ちることになります。そうするとSDカードへの書き込み途中の場合、ファイルが壊れてしまい最悪OSが起動しなくなる可能性があります。
組み込み系のデバイスの場合はROM化して書き込みを無くすようなので、Raspberry PiでもROM化できないか調べてみたところoverlayfsという仕組みを使うことでSDカードへの書き込みを制御できることが分かりました。
以下の記事に設定方法が詳しく書かれていましたが、設定後システムが不安定になる場合もあるようなので今回は諦めることにしました。
qiita.com
代わりにモバイルバッテリーを電源に使うことにしました。
少し面倒ですがとりあえずは当面これで使ってみたいと思います。OSのシャットダウンについてはWebの画面にシャットダウンボタンを付けて代用しようかと考えています。

まとめ

CO2濃度計を作ってから自宅、車と両方で使っています。
大気中のCO2濃度は約400ppmくらいなので、換気が行き届いている場合は同様の数値となりますが、窓を締めるとすぐに数値が上がっていきました。
実際に車で4人が乗り、内気循環にしていると30分程度で2000ppmを超える濃度となりました。2500ppmを超えるとパフォーマンスが落ち眠気を誘うとの研究もありますので、運転中は適度に換気したほうがよさそうですね。

AWS GlueでS3から差分データを取得する

日次処理などの定期処理でS3からデータを取得しparquet形式に変換やDBにLoadするといったケースについては、ETLする際によくあるのではないでしょうか?
上記のようなケースでは、ETL処理の際にS3から前日からの差分を取得する必要があります。Glueにおいて自動生成されるコードでは、対象のデータソース全てを読み込んでしまうため読み込み後にフィルタする必要があります。すべて読み込んでフィルタするのはデータ量が多いとあまり効率はよくないですよね。
今回はこうしたケースにおいてどのように取得できるか考えてみたいと思います。

差分データを取得する方法

DynamicFrameを利用して取得する方法としては以下3つがあるかと思います
①from_catalogue関数でPushdown Predicatesオプションを利用
Pushdown Predicatesを利用すると読み込み時に条件を指定してフィルタすることが可能です。
ただし、データソースがパーティショニングされている必要があるため、データソースによっては簡単に使えない場合があります。設定方法については以下で紹介されていますので参照してください。
AWS Glue の Pushdown Predicates を用いてすべてのファイルを読み込むことなく、パーティションをプレフィルタリングする | Developers.IO

②JobBookmarkを利用
JobBookmarkを利用すると前回読み込んだところまで記録されているため、差分のみ取得することが可能となります。
バックエンドで色々とやってくれるので便利でいいのですが、どのような仕組みで実現されているのか見えないため、処理に問題があった際にリカバリが難しくなることが考えられます。また、ETL処理の前に毎回クローラーを実行する必要があります。
AWS GlueのJob Bookmarkの使い方 - cloudfishのブログ

③from_options関数を利用
from_options関数を利用することでS3のパスを直接指定することが可能です。この方法の場合、データソースがパーティショニングされている必要はなくパスを指定することで読み込みが可能です。

今回は③の方法について試してみたいと思います。

準備

まずは、適当なCSVファイルをS3に配置しておいてください。

ジョブの作成

ジョブを作成し以下のコードを入力して実行してください。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

#from_options関数で読み込み
datasource0 = glueContext.create_dynamic_frame.from_options( connection_type = "s3",connection_options = {"paths": [ "s3://glue-testdata-xxxxx/input/"]},format="csv", format_options={ "withHeader": "true","skipFirst": "true"})

datasink2 = glueContext.write_dynamic_frame.from_options(frame = datasource0, connection_type = "s3", connection_options = {"path": "s3://glue-testdata-xxxxx/output"}, format = "csv", transformation_ctx = "datasink2")
job.commit()

ポイントはfrom_options関数を使って読み込むところになります。connection_optionsにS3のパスを指定します。ここはファイル指定や複数入力可能です。
また、format_optionsで1行目をヘッダーとして読み込むことや読み飛ばすこともオプションで可能となっていますが、現時点ではバグのため指定がきかないようなので、読み飛ばしやカラム名の指定については独自で実装してやる必要があります。
また、型変換、カラムの削除なども自分で実装する必要があります。

まとめ

定期実行してデータを取り込むような処理はよくあると思いますのでそういった場合に使えそうですね。
ただし、まだバグがあるようなので利用する際はしっかりと検証したほうがよさそうです。

obnizで停電の復旧確認をしてみた

みなさん台風21号の影響はどうでしたでしょうか?今回は私の自宅も3日ほど停電になり結構困りました。
未だに停電から復旧していない地域もあるので早く復旧してほしいですね。
停電してはじめて分かるのですが、電気が生活の基礎を支えているということに改めて気付かされます。断水していなくてもトイレの水も流せないですし、お風呂も沸かせないといった状況になり色々なことができませんでした。
当時、自宅で仕事をしていたのですが、ネット接続も切れてしまったため仕事もできなくなりました。
幸いにも親類の家は停電していなかったため、台風通過後に避難することができたためずっと不便な生活を強いられることはなかったのですが、避難できる先が無いような方は大変だったと思います。

以前も雷による停電を経験していたこともあるのですが、避難するといつ停電から復旧したかが分からないのが非常に困ります。
そのため、今回はどうにかして外部から停電の復旧を確認したいと考えていました。
自宅にサーバを立てて外部から入れるようにしていた時期もありましたが、今はそんなこともしていないのでサーバで復旧確認もできませんでした。

そこで思いついたのが、最近手に入れたobnizを復旧確認に利用する方法でした。
obnizで遊んでいる時にたまたま気がついたのですが、obnizを外部マシンのNodejsから実行するとobnizがオンラインになるまでタイムアウトせず実行し続けており、オンラインになった時点でプログラムが実行される挙動となっていたため、これを利用することで停電の復旧確認ができるのではないかと思い自宅で実証実験をしてみました。

前提としては自宅のwifiに接続されていたことと、復旧時に電源が供給されるようにしておくことです。
プログラムは実行できれば何でもいいと思います。オンラインになったらslackに通知するといったようなことも考えましたが、そもそもテストもできないため文字列を表示するだけの簡易なプログラムとしました。

const sleep = require('system-sleep');
var Obniz = require("obniz");

var obniz = new Obniz("XXXX-XXXX");
obniz.onconnect = async function () {

    while(true){
       console.log('---------------------');
       console.log('Connected!');
       sleep(5000);
    }
}

これを動かし続けておくことで、obnizがオンラインになったタイミングで実行されるはずなので電気の復旧が分かると考えました。
実際に仕事中は作業用MacのDockerコンテナから上記プログラムを動かし続けていました。

また、以下のOnlineEditorからも実行できるので、仕事中以外は時々スマホからサンプルプログラムを実行するということを試していました。
Online Editor - obniz

本当に動くか不安でしたが、停電から3日後の9/6 10時40分頃にコンソールにログが表示されていたため、おそらく電気が復旧したということが分かりました。

まとめ

今回は台風による停電のためブレーカーを落とすことはしませんでしたが、地震などの場合には復旧時に火事になるケースがあるためブレーカーを落として避難することが推奨されていますので、充分注意してください。
また、途中で気がついたのですが、自宅にAmazon Echoがある方は、スマホのアプリからオンラインかどうかが分かるため、わざわざobnizを使って確認しなくても復旧状況がわかると思います。実際にobnizで復旧確認できた際には自宅のAmazon Echoもオンラインとなっていたため本当に電気が復旧したと確信が持てました。
頻繁に停電が発生するわけではないのでそんなにニーズはないと思いますが、こういった実際の状況でIoT機器を役立てることができたのはすごく面白かったです。

obnizをAWS Fargateから動かしてみた

obnizというIoTのコントロールボードが面白そうだったので早速購入してみました。
obnizの特徴としては、wifiと小型のディスプレイが内蔵されており、プログラムもWeb上からjavascriptで可能というかなりお手軽なガジェットです。
サンプルプログラムを動かすだけなら、箱から出してwifiに接続し、QRコードスマホから読み込み開いたweb上からプログラムを実行することが可能です。
Lチカを試そうとしたのですが、今回は技術の無駄遣いをしてみようと思い、あえてAWSのFargateからobnizを操作してみました。
ちなみにnodejsで実行可能なのでLambdaでも実行できます。
f:id:cloudfish:20180901004439p:plain

準備

obniz x1
Mac(docker環境インストール済み)

手順概要

  • Dockerイメージ作成
  • obnizアプリ作成
  • Fargate設定
  • 実行確認

Dockerイメージの作成

今回はコンテナ上でobnizのアプリを作成してそれをベースイメージとします。
まずはnode環境の準備ととcanvasモジュールが使いたかったのでubuntuベースのコンテナを利用しました。

FROM ubuntu
RUN  apt-get -y update
RUN  apt-get install -y nodejs npm vim git
RUN  apt-get install -y libcairo2-dev libjpeg-dev libpango1.0-dev libgif-dev build-essential g++

イメージのビルド

docker build -t obniz_container .

obnizアプリの作成

コンテナにログイン

作成したコンテナにログインする

docker run -it obniz_container /bin/bash

アプリの作成

以下のコマンドを実行してnodeモジュールのインストールとプロジェクトを作成します

mkdir /opt/obniz_test
cd /opt/obniz_test
npm init
npm install system-sleep
git clone https://github.com/Automattic/node-canvas.git
npm install node-canvas

/opt/obniz_testにindex.jsを作成し以下を入力します。OBNIZ-IDは8桁のハード固有の番号を入力してください。
ディスプレイに「Hello Obniz!」という文字列を左から右に延々流し続けるだけのプログラムです。

const sleep = require('system-sleep');
const { createCanvas } = require('canvas');
var Obniz = require("obniz");

var obniz = new Obniz("OBNIZ-ID");
obniz.onconnect = async function () {
    for(var i=0;;i=i+8){
       var canvas = createCanvas(obniz.display.width, obniz.display.height);
       var ctx = canvas.getContext('2d');
       ctx.fillStyle = "white";
       ctx.font = "12px Serif";
       ctx.fillText('Hello Obniz!', i, 40);
       obniz.display.draw(ctx);
       if(i >= obniz.display.width){
            i = 0;
       }
       sleep(500);
    }
}

実行確認

以下コマンドを実行し画面に文字が表示されることを確認します。

node index.js

コンテナイメージの再作成

docker commit {docker ps で表示されるNAMES} obniz_test

ECRにプッシュ

作成したイメージをECRにプッシュしてください。
ECRの作成方法及びプッシュ方法については、【AWS】初めてのECRを参照

Fargate設定

次にFargateの設定を行います。
以下のリンクをクリックして、チュートリアルに沿って作成します。
https://ap-northeast-1.console.aws.amazon.com/ecs/home?region=ap-northeast-1#/firstRun

customを選択し設定をクリックします
f:id:cloudfish:20180831205827p:plain
コンテナ名を入力し、ecrにプッシュしたイメージのurlを入力します。
f:id:cloudfish:20180831205955p:plain
コマンドに「/usr/bin/node,/opt/obniz_test/index.js」を入力します。
f:id:cloudfish:20180831210019p:plain
「次へ」をクリックします。
f:id:cloudfish:20180831210208p:plain
「次へ」をクリックします。
f:id:cloudfish:20180831210231p:plain
クラスター名を入力し、「次へ」をクリックします。
f:id:cloudfish:20180831210245p:plain
設定内容を見直して作成します。
f:id:cloudfish:20180831210341p:plain
5-10分程度でタスクが実行されます。
f:id:cloudfish:20180831210416p:plain

タスクが正常に実行されると以下のように、ディスプレイに「Hello Obniz!」と表示され、左から右へ文字が流れていきます。
f:id:cloudfish:20180901010944p:plain

まとめ

今回は無理やりFargateから動かすということをやってみましたが、正直なところ利用シーンが全然思い浮かばないので全く役にたたない内容だと思いますが、Fargateも試すことができて面白かったです。
obnizは手軽に色々試せるのでかなり面白いコントロールボードだと思いますの。このサイズでディスプレイが付いているのもすごくいいですね。みなさんも是非楽しんでみてください。

AWS Glueのジョブ監視

先日、CloudWatch Eventを確認するとGlueも追加されていたのでジョブの監視をやってみました。

検証構成

f:id:cloudfish:20180814085941p:plain
CloudWatch EventでGlueのJobステータスがSucceededもしくはFailedになればSlackに通知しています。
現状、特定ジョブやイベント(Failedのみなど)のみ通知できないので、フィルタをかけれるようにLambdaを利用することにしました。特定イベントのみ通知できるようになればSNSも使えそうです。
今回はLambdaを利用しているので、Slackだけでなく他のAPIを呼び出すなども可能です。

手順

Slack通知用Lambdaの作成

事前にKMSのキーとSlackのチャンネルに通知するためのIncoming Webhookのエンドポイントを作成しておいてください。
「Lambda」→「関数の作成」をクリックし「一から作成」を選択して以下を入力します。
名前:GlueMonitorSlackNotification
ランタイム:Python3.6
ロール:テンプレートから新しいロールを作成
ロール名:lambda-glue-role
ポリシーテンプレート:KMSの復号化アクセス権限
f:id:cloudfish:20180814092416p:plain

プログラムは以下に置き換えます。

#coding:utf-8
from __future__ import print_function

import json
import urllib.parse
import boto3
import logging
import os
import sys

from base64 import b64decode
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError

logger = logging.getLogger()
logger.setLevel(logging.INFO)
SLACK_CONFIG = {}

def decrypt_environment(encrypt_environment_key):
    encrypted_value = os.environ[encrypt_environment_key]
    return boto3.client('kms').decrypt(CiphertextBlob=b64decode(encrypted_value))['Plaintext'].decode()

def init():
    global SLACK_CONFIG
    SLACK_CONFIG = {
        "END_POINT": decrypt_environment('SlACK_END_POINT'),
        "ICON": ":aws:",
        "USERNAME": 'GlueJobBot'
    }
    
def post_message(event):
    post_message = {}
    post_message["icon_emoji"] = SLACK_CONFIG['ICON']
    post_message["username"] = SLACK_CONFIG['USERNAME']
    post_message["attachments"]= [0] * 1
    post_message["attachments"][0]= {}
    post_message["attachments"][0]["fallback"] = 'Glue Jobが実行しました'
    post_message["attachments"][0]["color"] = '#36a64f'
    if event['detail']['state'] == 'FAILED':
       post_message["attachments"][0]["color"] = '#E51616'
    post_message["attachments"][0]["pretext"] = ''
    post_message["attachments"][0]["author_name"] = ''
    post_message["attachments"][0]["author_link"] = ''
    post_message["attachments"][0]["title"] = 'Glue Job ' + event['detail']['jobName'] + ":" + event['detail']['state']
    post_message["attachments"][0]["text"] = 'JobID:' + event['detail']['jobRunId']

    payload_json = json.dumps(post_message)
    data = urllib.parse.urlencode({"payload": payload_json})

    req = Request(SLACK_CONFIG['END_POINT'], data.encode('utf-8'))
    try:
        response = urlopen(req)
        response.read()
        logger.info("Message posted!! %s", post_message["attachments"][0]["title"])
    except HTTPError as e:
        logger.error("Request failed: %d %s", e.code, e.reason)
    except URLError as e:
        logger.error("Server connection failed: %s", e.reason)

def lambda_handler(event, context):
    init()
    post_message(event)
    

関数を作成後、環境変数にSlACK_END_POINTとIncoming Webhookのエンドポイントを入力します。
「伝送中の暗号化のためのヘルパーの有効化」をチェックし、「伝送中に暗号化する KMS キー」に作成したKMSキーを選択して、環境変数欄にある「暗号化」をクリックします。
f:id:cloudfish:20180814124215p:plain

CloudWatch Eventの作成

サービス名:Glue
イベントタイプ:Glue Job State Change
ターゲット:Lambda関数
機能:GlueMonitorSlackNotification ←作成したLambda関数を選択
f:id:cloudfish:20180814125109p:plain

Glue Jobの作成

作成済みのジョブがあればそれを利用してください。
なければ、左メニュー下のチュートリアルのジョブの追加に従ってジョブを追加してください。

実行確認

準備ができればジョブを実行し、Slackの該当チャンネルに通知されているか確認します。
ジョブが成功した時
f:id:cloudfish:20180814130323p:plain
ジョブが失敗した時
f:id:cloudfish:20180814130330p:plain
正しく設定できていれば上記のように通知されるはずです。
現時点でCloudWatch Eventではジョブの指定や失敗時のみなどの細かい設定ができないため、Lambda側でフィルタして通知する必要があります。
これでジョブの成功失敗を監視することができそうです。

Glueのメトリクス

また、Glueにジョブのプロファイリング機能が追加されています。
ジョブのプロファイリング機能を有効にすると以下のメトリクスが取得できます。以下は簡単ですが日本語に訳した内容となります(By Google翻訳
各メトリクスの詳細については、以下を参照してください。
Monitoring AWS Glue Using CloudWatch Metrics - AWS Glue
これらのメトリクスはジョブの中長期的な処理傾向に問題がないかどうかを確認できると思うので、ジョブの内容に合わせて必要なメトリクスを定期的にモニタリングもしくは監視していけばいいかと思います。

* メトリクス * 詳細
glue.driver.aggregate.bytesRead すべてのエグゼキュータで実行されているすべての完了したSparkタスクによって、すべてのデータソースから読み取られたバイト数。
glue.driver.aggregate.elapsedTime ETL経過時間(ミリ秒単位)(ジョブのブートストラップ時間は含まれません)
glue.driver.aggregate.numCompletedStages ジョブの完了段階の数
glue.driver.aggregate.numCompletedTasks ジョブ内で完了したタスクの数
glue.driver.aggregate.numFailedTasks 失敗したタスクの数
glue.driver.aggregate.numKilledTasks killされたタスクの数
glue.driver.aggregate.recordsRead すべてのエグゼキュータで実行されているすべての完了したSparkタスクによって、すべてのデータソースから読み取られたレコードの数
glue.driver.aggregate.shuffleBytesWritten 以前のレポート(AWS Glue Metrics Dashboardによって集計されたデータで、前の1分間にこの目的で書き込まれたバイト数として集計されたもの)以降、すべてのエグゼキュータがデータをシャッフルするために書き込んだバイト数
glue.driver.aggregate.shuffleLocalBytesRead 以前のレポート(AWS Glue Metrics Dashboardによって集計されたバイト数)で、すべてのエグゼキュータがそれらの間でデータをシャッフルするために読み込んだバイト数
glue.driver.BlockManager.disk.diskSpaceUsed_MB すべてのエグゼキュータで使用されるディスク容量のメガバイト
glue.driver.ExecutorAllocationManager.executors.numberAllExecutors 実行中のジョブ実行者の数
glue.driver.ExecutorAllocationManager.executors.numberMaxNeededExecutors 現在の負荷を満たすために必要な最大(実行中のジョブおよび実行中のジョブ)エグゼキュータの数
glue.driver.jvm.heap.usage
glue.executorId.jvm.heap.usage
glue.ALL.jvm.heap.usage
ドライバ、executorIdによって識別されるエグゼキュータ、またはすべてのエグゼキュータのJVMヒープによって使用されるメモリの割合(スケール:0-1)
glue.driver.jvm.heap.used
glue.executorId.jvm.heap.used
glue.ALL.jvm.heap.used
ドライバのJVMヒープで使用されるメモリバイト数、executorIdによって識別されるエグゼキュータ、またはすべてのエグゼキュータ
glue.driver.s3.filesystem.read_bytes
glue.executorId.s3.filesystem.read_bytes
glue.ALL.s3.filesystem.read_bytes
ドライバによってAmazon S3から読み込まれたバイト数、executorIdによって識別されたエグゼキュータ、または前回のレポート以降のすべてのエグゼキュータ(AWS Glue Metrics Dashboardによって前の1分間に読み取られたバイト数として集計)
glue.driver.s3.filesystem.write_bytes
glue.executorId.s3.filesystem.write_bytes
glue.ALL.s3.filesystem.write_bytes
以前のレポート(AWS Glue Metrics Dashboardによって前の1分間に書き込まれたバイト数で集計されたもの)以来、ドライバ、ExecutorIdによって識別されたエグゼキュータ、またはすべてのエグゼキュータによってAmazon S3に書き込まれたバイト数
glue.driver.system.cpuSystemLoad
glue.executorId.system.cpuSystemLoad
glue.ALL.system.cpuSystemLoad
ドライバによって使用されたCPUシステム負荷の割合(スケール:0-1)、executorIdによって識別される実行プログラム、またはすべての実行プログラム

まとめ

Glueが出た当初は、CloudWatch EventからGlueのステータス変更は取得できなかったと思うので、ステータス取得用のLambdaで別途作る必要がありましたが、CloudWatch Eventから拾えるようになったのでジョブの成功・失敗が拾いやすくなりました。もう少しCloudWatch Event側でジョブやステータスの指定など細かく制御できると失敗のみSNSで通知するなどができるので手軽にできるのですが今後の改善に期待です。
また、CloudWatchメトリクスも提供されましたので、これらも含めてGlueのジョブ監視を行なうことができそうですね。