Iki network, bir DAG

Baya bir suredir devlog yazmiyorum. Yine bir solukta okuyacaginiz bir devops / dataops macerasi ile karsinizdayiz.  

Day 1

Is yerinde tamamen iki farkli ortamda calismak durumundayiz. Birtanesi is yaptigimiz sirketin super secure data-centeri otekisi de kendi sirketimizin on-premises makinesi. Bu iki farkli ortam uzerinde dagilmis bir airflow pipeline'a ihtiyac var. Bakalim nelerle karsilacagiz. 

Diyelim ki is yaptigimiz sirket X, kendi sirket makinemiz ise Y. 

X'ten Y'ye rest api , ssh veya baska hicbir sekilde ulasilamiyor. Ama Y'den X'e ssh yapilabiliyor. 

Ama airflow dag X'te basladigi icin, X'ten Y'ye bir trigger gerekiyor. Tersi olsa kolaydi cunku ssh imkani var Y'den X'e. Durum asagi yukari soyle:


Tek iletisimimiz sshfs uzerinden mount edilebilen MapRFS olarak gozukuyor. Simdi plan su:

1. X'teki DAG, isi bittigi ve Y'dekini trigger etmek istedigi zaman MaprFS uzerinde belirlenmis bir lokasyona bir dosya olusturacak

2. Y'deki makine uzerinde 2 tane systemd servis olacak
    2.1. MapR FS'i mount edecek
    2.2 WatchDog scriptini calistiracak 

    Burda iki systemd servisi arasinda bir dependency var. Ama soylenene gore systemd bunu handle edebiliyormus. Neden servislere ihtiyac var? Cunku bazen bu makineye reset atiyor insafsizlar. Haliyle MaprFS disconnect olur, watchdog durur, bizim pipeline got olur afedersiniz. 

3. Y'deki makine uzerinde surekli calisan bir watchdog ile MaprFS uzerindeki belirli lokasyon surekli watch edilecek. Yeni dosya olusturuldugu zaman lokaldeki DAG'ini trigger edecek. Hatta burada dag'a falan da gerek yok, spark job'unu lokalde run edecek. 

4. Y'deki is bittikten sonra yine bir dosya olusturulacak belirlenmis bir MaprFS lokasyonunua

5. X'teki DAG, isini bitirip Y'yi trigger etmek icin dosyasini olusturduktan sonra, bu sefer isin bittigini temsil eden dosyayi gozetlemete baslayacak. Bunu bir airflow file sensor ile yapabiliriz diye dusunuyorum. 

WatchDog
Ise ilk olarak watchdog ile basliyorum. Y'deki makine uzerinde bir python scripi seklinde konumlandiriyorum. watchdog kutuphanesi cok kullanisli, dosya yaratma, silme ve hatta move ve modify islemleri icin bile event handler'lar tanimlayabiliyorsunuz. Asagi yukari watchdog suna benziyor (detaylar suradan aldim)

import time
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
import os

# event handler
patterns = "*"
ignore_patterns = ""
ignore_directories = False
case_sensitive = True

my_event_handler = PatternMatchingEventHandler(
    patterns, ignore_patterns, ignore_directories, case_sensitive
)

def on_created(event):
    print("{path} has been created!".format(path=event.src_path))
    dosya_geldi_jobumu_calistir()

def on_deleted(event):
    print("deleted {path}!".format(path=event.src_path))

my_event_handler.on_created = on_created
my_event_handler.on_deleted = on_deleted

# Observer
path = "."
go_recursively = True
my_observer = Observer()
my_observer.schedule(my_event_handler, path, recursive=go_recursively)

my_observer.start()

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    my_observer.stop()
    my_observer.join()

Test etmek icin dikkat ederseniz kendi bulundugu klasoru watch ediyor (observer kismindaki path degiskeni). Test ettim. Gayet guzel calisiyor. Simdi MaprFS'i watch etmesi gerekiyor. Bunun icin de sshfs ile MaprFS'i makineye mount ediyorum. 

sshfs  -o allow_other x_kullanicisi@x_deki_bir_edge_node:mapr_fs_yolu y_deki_maprfs_mount_path

Bunu yaptiktan sonra Y makinesinden MaprFS'e lokal bir klasormus gibi erisebiliyor olduk

Simdi WatchDog'u guncelleyip tekrar test ediyorum. 

Bir sidenote su ki, monolith bir proje yapisina sahibiz. Cok bilincli bir tercih oldgunu dusunuyorum. Bunu da ileride tartisiriz. Ancak dev toollari monolith degil ki. Ha bu IntelliJ python kodu gorunce bir afalliyor, biz de pycharm ve IntelliJ arasinda gidip geliyoruz. 

Test basarisiz. Yeni olsuturulan dosyayi testpit edemedik. Cozum observer degisikligi. 

Network'ten mount edilen klasorlerde standart Observer'in calismadigini gormus olduk. Bunun yerine onerilen PollingObserver kullanilmasi. Kodda cok ufak bir degisiklik ile bu sorunu cozuyoruz. 

WatchDog tamam, calisiyor. Yani enazindan dosya olusturuldugunda farkedebiliyor. 

Day 2

Ikinci asama olarak da dosya yaratildigi zaman gercek data processing job'unu Y'deki makinede calistirmak. Bunu detaylarini veremiyorum ama client mode ile lokalde bir spark job calistiriyorum. Bunun icin olusturulan dosya adini parse etmek ve gerekli argumanlari buradan almak gerekiyor. Ornegin spark job'un prod mu yoksa staging mi gibi hangi environment'te calismasi gerektigi bilgisini dosya adindan alinarak spark job'una pass edilmesi gerekli. 

Airflow FileSensor
Bu sekilde X'ten Y uzerinde bir job trigger ettik. X'in bu sefer Y'deki job'un sonucunu beklemesi gerekiyor. Bu bekleme islemini yine DAG icerisinden airflow'un FileSensor operatoru ile yapacagiz. Burada korktugum bir nokta ise Y makinesindeki Poll gereksinimi. Cunku watchdog mount edilmis klasorlerde tam olarak watch edememisti. Bakalim Airflow FileSensor bunu yapabilecek mi? 

Bunu denemek icin izole, cok ufak bir dag olsuturup X uzerindeki Airflow'da test edecegim. 

Deneme cok da guzel basarili oldu, docker container icerisinde calisan airflow, belirli bir path uzerine mount edilmis olan MaprFS uzerinde, verilen dosya olustugu anda success olarak tamamlandi. Test dag soyle bisey:

from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from airflow.models import DAG, Variable
import airflow

args = {
    'owner': 'Airflow',
    'start_date': days_ago(2),
}

dag = DAG("file_sensor_test", default_args=args, schedule_interval=None, tags=["Xcom", "test"])

start_task = DummyOperator(task_id="start", dag=dag)
stop_task = DummyOperator(task_id="stop", dag=dag)

sensor_task = FileSensor(
    dag=dag,
    fs_conn_id="fs_baglanti_id",
    task_id="git_bi_bak_olustu_mu_git_bi_bak",
    poke_interval=30, # sanirim saniye
    filepath="/mount_noktasi/bazi_path/deneme.txt"
)

start_task >> sensor_task >> stop_task

Tabi bunun icin once bir file system connection olusturmak gerekiyor Airflow admin paneli uzerinde. Seceneklerden tip icin file system secilip, kullanici adi parola girildikten sonra son olarka da ekstra parametreler kisminda {"path": "/mount_noktasi"} seklinde tanimlama yaparsak, bu folder uzerinden bir file system connection olusturmus oluyoruz.  (mount_noktasi yukaridaki kodda da var, dikkat!)

Simdi bu denemelik file system sensor taskini, X'te calisan gercek DAG'e ekliyorum. 

Sonuc basarili. X'ten Y'deki jobu trigger edebiliyoruz. Ve de Y'deki job bitince bunu X'de anlayabiliyor ve DAG'a devam edebiliyoruz. 

XCom ve bir trade-off
Orijinal fikirde, X'ten Y'ye job trigger eden dosya icerisinde trigger edildigi andaki tarih ve saat bilgisi vardi. Bu sayede her bir trigger unique olacak ve o triggeri bekleyen FileSensor de, gercekten sadece o trigger sonuclanmis ise devam edecekti. Bu sayede aslinda concurrent pipeline calistirabilecektik. Ancak bunu yapmak icin trigger dosyasini olusturan airflow python task'i bir tarih-saat degeri uretim bunu x-com'a push etmeli ve downstream'de bekleyen FileSensor de tam olarak bu tarih saati iceren dosyayi beklemeli. Bunun icin de yine x-com'dan pull etmeli. 

Pipeline'i yeni kurdugum icin concurrent calisma durumu da olmayacagi icin simdilik trigger dosyasi adinda tarih-saat bilgisi tutmuyorum. Sabit bir dosya adi ile ierleyince x-com'a da gerek kalmiyor. Haa cok istenirse ileride, pipeline oturduktan sonra cok rahat bir sekilde eklenebilir. 

systemd servisleri
Simdiye kadar Y uzerinde MaprFS mount edili oldugunu ve WatchDog'un da calistigini kabul ettik. Ancak Y makinesinden kendi kullanicim bile logout olsa MaprFS mount gidiyor, watchdog da duruyor haliyle.  

Burada bir maceranin daha sonuna gelmis oluyoruz. Henuz systemd servislerini olusturmadim ama  DAG calisiyor gayet guzel bir sekilde. Iki farkli network uzerine dagilmis is yuku ve tamamen MaprFS'e dayanan haberlesme sistemi ile datamizi isliyor, sonuclar uretiyor.

systemd servisi olusturma kismini da baska bir postta ele almak dilegiyle sayin okurlar, simdilik hoscakalin. 


Yorumlar

Bu blogdaki popüler yayınlar

Python'da Multithreading ve Multiprocessing

Threat Modeling 1

Encoding / Decoding