Airflow ile K8s Otomasyonu

Baaazi dosyalari isleyerek baaazi output ureten bir pipelinimiz var. 

Bu dosyalari hizli isleyebilmek icin, python'da yazdigimiz uygulamayi dockerize edip, k8s uzerinde paralel calistiriyoruz. Ancak elle yapiyoruz bu isi. Ne kadar da amelece. 

Simdi bunu alip, Airflow ile otomatize edelim.


1. Gun

Mevcut Yontem

Halihazirda dosyalari manuel olarak 20 farkli klsore boluyoruz. Daha sonra 20 farkli Pod kaldirip bu klasorleri parametere olarak veriyoruz. Klasorler de ayrica distributed bir file storeage uzerinde bulunuyor, volume mount ile hallediyoruz. Haliyle pod icerisinde calisan python uygulamasi sadece kendine verilen yol'daki dosyalari isliyor. Boylece kimse kimsenin isine karismadan butun dosyalari paralel olarak isliyoruz.  

Ama otomatize edebilmek icin cok da iyi bir yontem degil. 

1. Elle (veya hatta otomatize olarak) dosyalari klasorlere bolmek istemiyorum
2. Pipeline'i kolay bir sekilde trigger edebilmek istiyorum
3. Parallesim'i kolayca ayarlayabilmek istiyorum (ornekte 20 pod ayni anda calisiyor, belki 50 yapmak istiyorum?, `ne belli?`)


Airflow, amcalara dag'ini goster oglum

1. Tum islenecek dosyalar ayni klasorde yer aliyor aslinda. Bunlari bir sekilde logical gruplara ayirmamiz gerekiyor. Bu konuda hash'lemeden yardim alabiliriz. 

Burada iki inputa ihtiyac var. birincisi kac tane bucket olmasini istiyoruz, bu arguman tum pod'lara gonderilecek. Ikinicisi de her pod'a ayri bir bucket_id gondermemiz gerekiyor ki, bu pod, verilen dosyalar arasindan sadece kendi bucket_id'sine karsilik gelen dosyalari secip islesin. Bu sayede paralelde calisan podlar birbirlerinin dosyalarina dokunmasin. 

Bunun icin mm3 isimlu murmur hash kutuphanesini kullaniyorum. 

files_in_bucket = [k for k in all_files if is_in_bucket(k)]

seklinde olabilir. 

2. Elimizde docker image'i olmasi gerekiyor ki bunun airflow ile k8s uzerinde calistirabilelim. Aslinda burada baska implicit akislar da var. 

1. dosyalari isleyen python projesinin dockerize edilmesi gerek

2. dockerize edildikten sonra, bu image repository'e push edilmesi gerekiyor

Baska bir problem de, dosyalari isleyen python projesinin baska bir repo'da olmasi. Bunu da yine airflow dag iceriinden cozebiliriz. Dag icerisinden o repoyu clonlaryip, dockerize edip, image repositorsine push edebiliriz, gibime geliyor suan icin. 


Docker build ile alakali bir problem
Bu arada gene bir problem oldu. Projeyi Dockerizer ederken surekli Temporary failure resolving 'archive.ubuntu.com' hatasi aldim. Bu tarz hatalarla daha once 50 kere karsilasilsa da, bir sure sonra tekrar unutuyorsun. Buraya yazayim da belki baska kisiler de kolayca cozerler cunku cozum cok basit. Ne kadar hata mesaji ubuntu sitesi cokmus gibi izlenim verse de aslinda sourn bizde. Dns ayarlari tirt. Cozum burada (https://medium.com/@faithfulanere/solved-docker-build-could-not-resolve-archive-ubuntu-com-apt-get-fails-to-install-anything-9ea4dfdcdcf2) 


Dockerize ederken esas yapilmasi gereken, bu container calistigi zaman buna bucket_id ve bucket_count argumanlarini verebiliyor olmamiz gerekiyor. Sanirim kolayi environment variables kullanmak. Elimizde iki yapi var ARG ve ENV.  ARG, aslinda adindan da anlasilacagi gibi sadece build-time icin gecerli, verilen argumana gore image'inizi build ediyorsunuz. Ama ARG, build sonrasi olusan image'den uretilen container'larda da ulasiablilir durumda, yani runtime'da. Asagidaki temsili Dockerfile parcacigi ile bu isi halletmis oluyoruz.

ENV bucket_id=0
ENV bucket_count=20

CMD python process_files.py -c $bucket_count -b $bucket_id


Tabi bu ENV argumanlarini bir de k8s tarafindan deploy edilirken de pass etmek gerekiyor. Bunu da Airflow dag'ini gelistirirken anlayacgiz nasil yapabiliriz?

Bu bir devlog oldugu icin isleri yaparken bir taraftan yaziyorum. Suanda build ettigim image'i push ediyorum. Bu image, ayni zamanda bucket_id ve bucket_count argumanlarini da environment'ten okuyarak, python uygulamasini bu argumanlarla ile calistiriyor. Ama cok yavas. vmware uzerinde ubuntu calisiyor (kosuyor lafina uyuz oluyorum), container repository ile de aramizda VPN var.   


2. Gun

K8s operatoru
Gelelim Airflow tarafina. Bucket_id ve bucket_size argumanlarini alan ve uygulamayi buna gore calistiran docker image hazir. (hala push olmadi ama dewamke). Bunun icin ekstra k8s operatorunu kurmak gerektigini ogrenmis bulunmaktauim.

pip install 'apache-airflow[kubernetes]'

Daha sonra bu operatoru kullanarak olusturdugumuz image'i Airflow ile k8s uzerine (benim durumumda OpenShift ama ayni lacivert) deploy edebiliyor olacagiz.


K8s operatoru standart degil!
Ama tabii ki bir problem cikmasa sasiracaktik degil mi? Suanda Airflow'u PostgreSQL database connection'u ile docker-compose ile ayaga kaldiriyoruz. Airflow icin standart image'i (apache/airflow) kullaniyoruz. Ve de bu image icerisinde contrib operatorleri mevcut degil!

cozum 1: Contrib operatorlerini de barindiran bir image bul. (biraz bakindim pek yok gibi)

cozum 2: simdilik docker container'i ayaga kalkiginda , docker exec ile caktirmadan k8s operatorunu pip ile kurmak

Ok, ok. sikinti yok usagum, contrib varmis icinde, import etmeyi becerememisz. meh.


Olamiyoruz
Tamam simdi KubernetersPodOperator basariyla import edildi ve dummy bir image ile denemeye basladim. 

passing = KubernetesPodOperator(
        namespace='default',
        image="Python:3.6",
        cmds=["Python","-c"],
        arguments=["print('hello world')"],
        name="passing-test",
        task_id="passing-task",
        get_logs=True,
        dag=dag)

Bunu sitedeki ornekten aldim. Ama calismiyor. Service host/port is not set hatasi aliyoruz.

Anladigim kadariyla, operator k8s python client'ini kullaniyor. Ben remote bir OpenShift clusterinda calisiyorum ama ~/.kube/config lokasyonunda tum config zaten hazir var. Buradan okuyup calismasini beklerdim ama okuyamiyor sanirim.

2-dk-sonra:

Haha okuyamaz, cunku benim config lokalde ama airflow docker container icinde calisiyor. Bakalim oraya kopyalayayim, calisacak mi?

Container icerisinden Kube client'i calistiramadim cunku cluster ile haberlesemiyor: certificate verify failed. Sanirim hata config okumada degil, cunku load_config calisiyor, hata cluster'a baglanmada. Seritikalari da kopyalayim bakalim neler olacak?

Seritifa olayi degil. Config dosyasindaki token yanlis. Cunku openshift clientini containerin icerisinde indirip elle login olunca remote OpenShift clusterina pyton api tarafindan erisebiliyorum. oc login'den sonra config dosyasini kontrol ettim, token degismis. hmm. 

~~Arada microfront-end tartismalari oluyor ~~

Evet donduk. Simdi efendim bu hata incluster_config.py'dan gelen bir hata. Stack trace'de acikca goruluyor. Container'in bashine girdim, python repl'i actim. Deniyorum.

config.load_incluster_config() --> Service host/port is not set.

Ayni hatayi aldim. Cok guzel. Biraz googellayinca ortaya cikti ki,  eger kubernetes python client, k8s clusteri icerisinde yer aliyorsa bu load_incluster_config metodunu cagirmasi gerekir. Eger cluster disinda ise 

config.load_kube_config() --> sikinti yok :D !

cagirmasi gerekiyor. Bunu da denedim repl ile. Zaten config dosyasini da olusturmustuk, basarili bir sekilde calisti. 

Demek ki bizim k8s pod operatoru kendisini cluster icerisinde zannediyor. K8s pod operatorunun dokumantasyonuna bakiyoruz ve in_cluster seklinde bir parametre aldigina sahit oluyoruz.

Aslinda burada benim sigirligim ki once dokumantasyonu incelemedim. 

Ve eveet, en sonunda airflow icerisinden k8s uzerinde bir pod deploy edebiliyoruz 👍


Ancaaak
Buradaki elle config dosyasi olusturmak, hatta bu config dosyasindaki token expire etmis oldugu icin container icerisine OpenShift client'ini indirip login olup, tokeni guncellemek gecici bir cozum. Prodda bunu yapamaycagimiza gore bu konuya da bir cozum bulmak gerekiyor.

1. config dosyasini disardan pass edebiliriz. ama burada da sikinti var. sonucta config dosyasi icerisinde token var. sensitive bir data, bunu repo'ya push edemeyiz. 

2. secret. veya ticket dosyasi kullanmak gerekiyor ama bunu nasil yapariz henuz bilemiyorum. 


3. Gun

Token meselesi
Simdi gordugumuz gibi kube config icerisine token koyarsak bunu da usulunce KubernetesPodOperator'une verirsek, calisiyor. Kube config icerisindeki mevcut token buyuk ihtimalle kisa sureli ve benim kendi makinemde yaptigim login sonucu olusuyor. Ve bir sure sonra expire oluyor. Bununla Airflow icerisinde ugrasmamak icin oncelikle docker-compose dosyasinda, kendi lokalimdeki kube config dosyasini mount edecegim. Daha sonra da config dosyasinin yolunu Airflow variables'a kaydedecegim.


Hep aci, hep kahir
Simdi kendi makinemdeki kube config dosyasini kullanmasini saglamak icin docker-compose'da ./kube klasorunu mount etmeye calistim. Denemek icin docker-compose restart yaptigimda containerler restart edilemedi. Cok garip bir `permision denied` hatasi aliyoruz, du bakali ne olcek?
Problem app-armour diye bir uygulamadan kaynaklaniyor diyolla. `Blindly` birkac komut calistirdik sof'tan alip ama bakali ne olcek?

Aq komutu docker-compose'u kaldirmis sistemden. simdi tekrar yukleyip yeniden docker-compose up calistirdim. 

Bu arada bir arkadasla konustum. Onlar da bu mevzu bahis kubeconfig dosyasini (long term token barindiran), direg Airflow dag'inin kostugu lokasyona kopyalayip oradan okutturuyorlarmis. Bu biraz rahatlatti, cunku cok kolay ve itiraf etmek gerekirse biraz da amelece bir yontem. 

Beklerken gelecek hayalleri
Simdilik bu Airflow dagini lokalde calistiracagiz ama ilerde kendisine de OpenShift'e deploy edecegiz. Bu durumda bu kubeconfig dosyasini OpenShift secret'leri icerisinde saklayip oradan mount edebilirz diye tahmin ediyorum. Cok da temiz olmaz mi a sayin seyirciler :p

Kendi makinemdeki kubeconfig dosyasini mount etmem ise yaramadi cunku haliyle permission denied hatasi aldik. Benim lokaldeki .kube klasorune de ozel degil genel bir izin vermek (chmod 777 gibi bisey) istemiyorum, bunu da millete soyle yapin diyemem. O yuzden baska bir klasor olusturup ona kopyaladim. Du bakali ne olcek?


Dev darbe: Sertifikasyon
Tabi gene olmadi. Kendi makinemdeki kubeconfig dosyasi ile calisti ama bu sefer urllib3 tarafindan bir `certificate verify failed` hatasi alinmakta. Bu hatayi da daha once bin kere gordum ama gene unuttum. SSL sertifikasi mi eksiktir nedir? En basta SSL Sertifikasi nedir? 

Evet arkadaslar, elimde bir aret ca-cert.crt dosyasi var idi. Mevcut Docker registry ile irtibata gecebilmek adina is yaptigimiz sirket tarafindan provide edilen. Ama ben onu kullanmayi ancak akil edebildim. Olay su, kubeconfig icerisinde bu sertifikayi da belirtebiliyorsunuz:

apiVersion: v1
clusters:
    - cluster:
        server: https://openshift-master-bana-abarti-geldi.org:1453
        certificate-authority: /opt/airflow/config/rica-minnet-ca.crt
        name: openshift-master-hem-de-ne:2023


Simdilik gene cozuldu sorun ama bana guzel gelmedi. Bunu calistirmak isteyen diger team membersa oradan kubeconfig'i kopyala icerisinde bunu ekle falan demek cok iyi bir yontem gibi gozukmedi. Bu certificate dosyasini point etmenin kubeconfig'e yazmaktan baska bir yolu olmali. Kubernetes python client'ina bunu elle vermek mumkun ama biz KubernetesPodOperatorunu kullaniyoruz ve onda da bu tarz bir arguman gozukmuyor. Tek secenek kaliyor, environment variables. Bakalim bu yolla verilebiliyor mu sertifika yolu.


Diger bir gereksinim de volume mount edebilmek. Bizim pod OpenShift uzerinde data isleyecek sonucta. Bu data da MaprFS uzerinde bariniyor, daha onceden olusturulmus volume'u pod'a mount edebilelim ki dataya ulasip bu datayi isleyip geri yazabilsin. 

Birkac import hatasindan sonra KubernetesPodOperatorun argumanlari sayesinde volume mount edebildim. Ancak, bu islemler olurken pod'un ayaga kalkmasi biraz uzun surdu onceki denemelere gore ve OpenSHift uzerinde pod basarili olarak ayaga kalksa bile, airflow uzerinde k8s operatoru timeout oldu ve fail oldu. Bakalaim nasil artiracagiz timeout limitini? Cevap gecikmedi, dokumantasyondan geldi, startup_timeout_seconds argumani ile 5 dakika verdim, du bakali ne olcek?

Evet calisti sayin seyirciler. Pod icerisinden denemelik bir dosya olusturttum ve mapr fs uzerine basarili bir sekilde dosya olustu. Oynatalim, devam.

Simdi son kalan is (sertifika olayina daha elegant bir cozum bulmek disinda) airflow daginin kendisini yazmak :D Aslinda is buydu degil mi, kac gundur nelerle ugrastik daha isin kendisine ancak gelebildik. Ama orasi cok kolay.


Dag'in zirt dedigi yer

Airflow dag icerisinde Dummy operatorler ile baslangic ve bitis tasklarini olusturuyoruz. Daha sonra bir PARALLELISM constanti olusturarak, loop icerisinde yeteri kadar KubernetersPodOperatoru olusturuyoruz. Sonuc soyle:



Simdi bu koclarin her biri, k8s uzerinde bir pod ayaga kaldiracak ve kendisine environment variables vasitasi ile verilen bucket_id ve bucket_count argumanlari ile ilgili dosyalari isleyecekler ve sonuclari da ilgili yerlere yazacaklar. Tabi input ve output dir path'leri de env'den gelecek.

Hala daha ecek acak diyoruz cunku bu argumanlari kabul eden docker image her ne kadar build edilip ve lokalde test edildiyse de, remote container registry'e bir turlu push edilemedi. Benim baglantim cok kotu ama arkadaslardan yardim istedim onlar push etmeye calisiyorlar suanda. Push edildigi anda basacagim testi 1 parallelism ile. Bu aradan buradan askerlik arkadasim 4-hot'a da selam gondermek isriyorum. 

Push tamamlandi, testlere basladim. Ancak Dockerfile icerisinde path ile ilgili bir takim problemler yasaniyor ama cozumu kolay seyler. 

~~4 saat sonra~~

Kolay seyler tabirini kullanan hicbir programcidan tekrar haber alinamamistir. Dockerfile icerisinde problem varmis. Ayrica son anda commentleri uncomment ettikten sonra python indentation hatasi olusmus.  O sekilde master'a merge olmus (tabi test mest ne gezer). Tabi bu arada birsuru de toplanti vardi. Neyse hatalari duzelttim, ama remote docker registry'e push edemiyorum, baglantim boktan. 

Cuma aksami saat 17:48 ve hala planning tartismalarindayiz, image push olmuyor, fix'i repo'ya push ettim ama approve etmesi lazim birisinin.

~~aksam yemegi etc~~

Baska bir remote machine uzerinde docker image'i tekrar build edip oradan container registry'e push ettim cunku benim baglanti ile olacak gibi degildi. Suan testleri yapiyorum ve hersey calisiyor gozukuyor. Datalari isleyip, ciktiyi da yine verilen lokasyona yaziyor suanda !!!


Uc gunluk seruvernin ardindan bu meseleyi de resolve olmus sayabiliriz. Kah gulduk kah agladik. Bana bu surecte rubber duck'lik yaptigin icin tesekkur ederim sayin okur. Bir sonraki dev devops resitalinde daha birlikte olmak dilegiyle esen kalin. 


_________________________________

* Du bakali ne olcek?

* ...bu devops'a çok fazla kafa yorarsan sıyırırsın. kullanacaksın, nimetlerinden kullanıp işini göreceksin, kafayı taktın mı o zaman işin kötü hehe... çok fazla, hikmetine şey yapmamak lazım'' 

* is bu devlog'da hicbir editleme, reduksuyon, geri vites yoktur. bam bam bam yazilmistir. ohhhh beee. 




Yorumlar

Bu blogdaki popüler yayınlar

Python'da Multithreading ve Multiprocessing

Threat Modeling 1

Encoding / Decoding