Kayıtlar

airflow etiketine sahip yayınlar gösteriliyor

Airflow paralelizm

 Airflow ile bir dag icerisinde paralelde tasklar calistirabiliriz evet bu dogru. Ancak kac taski anyni anda paralel calistirabilirz bu birkac seye bagli.  1- Executor Hangi executor'u kullaniyorsunuz? Airflow'u lokal olarak kurdugunuzda default olarak SequentialExecutor kullaniyor. Ki bu da zaten paralel calistirma imkani vermiyor. Yine lokalde tek makinede ama birden fazla task calistirmak istersek bu sefer LocalExecutor kullanabiliriz. Zaten gerisi ( CeleryExecutor, DaskExecutor, KubernetesExecutor, CeleryKubernetesExecutor ) paralel calistirma uzerine kurulu. Ozet olarak en az LocalExecutor ile calisiyor olmamiz lazim ki paralelizasyon ayarlamasi yapabilelim.  2- Airflow paralelizm Tum daglarda (ayni anda birden fazla dag da calisabilir sonucta) ayni anda toplamda kac paralel task calisabilecegini oncelikle ayarlamamiz gerekiyor. Cunku bir ust limit. Bunu ayarlamanin 2 yolu var. Birincisi airflow.cfg dosyasindan bu ayari degistirebiliriz ya da airflow scheduler start olmad

Iki network, bir DAG

Resim
Baya bir suredir devlog yazmiyorum. Yine bir solukta okuyacaginiz bir devops / dataops macerasi ile karsinizdayiz.   Day 1 Is yerinde tamamen iki farkli ortamda calismak durumundayiz. Birtanesi is yaptigimiz sirketin super secure data-centeri otekisi de kendi sirketimizin on-premises makinesi. Bu iki farkli ortam uzerinde dagilmis bir airflow pipeline'a ihtiyac var. Bakalim nelerle karsilacagiz.  Diyelim ki is yaptigimiz sirket X, kendi sirket makinemiz ise Y.  X'ten Y'ye rest api , ssh veya baska hicbir sekilde ulasilamiyor. Ama Y'den X'e ssh yapilabiliyor.  Ama airflow dag X'te basladigi icin, X'ten Y'ye bir trigger gerekiyor. Tersi olsa kolaydi cunku ssh imkani var Y'den X'e. Durum asagi yukari soyle: Tek iletisimimiz sshfs uzerinden mount edilebilen MapRFS olarak gozukuyor. Simdi plan su: 1. X'teki DAG, isi bittigi ve Y'dekini trigger etmek istedigi zaman MaprFS uzerinde belirlenmis bir lokasyona bir dosya olusturacak 2. Y'deki ma

Airflow ile remote directory mount etme

 Uzak bir makinede dockerize olarak calisan bir airflow instance'im var. Bu airflow instance'i da baska bir remote machine'deki bir directory'e erisip, okuyup yazmasi icab ediyor. Aslinda gecen bir postta bahsetmistim. Dosyalari kopyalamak cok zahmetli onun icin mount etmemiz daha mantikli.  Esasinda isin ozu su ki, eger uzak makinedeki directory zaten mount edilmisse bisey yapma. Eger degilse bunu mount et. Burada 3rd party bir uygulama olan sshf kullandim ki onceki bir yazida bahsetmistim. Bunu kurmak kolay, ve de parolayi da bir cakallik ile buna aktarabiliyoruz.  Esas olay bir bash script'ini uzak makineye gonderip orada calistirmak: bash -c "if mountpoint -q -- '{lokaldeki_mount_noktasi}'; then \     printf 'maprfs is already mounted'; \ else \     echo '{password}' | sshfs -o password_stdin -o allow_other kullanici@uzak_makine:klasor_to_mount { lokaldeki_mount_noktasi }; fi" Bu komutu SSHOperator ile host makineye gonderiyorum

Airflow'da remote folder download edememe

Resim
Bazi birtakim workflow'lar icin remote bir makineden bir klasoru airflow'un calistigi makineye indirmem gerekiyor. Olaylar gelisiyor.  Abi benim konuyla en ufak bir alakam yok. Ilk hedef SFTPOperator, cunku bu operator ile verilen bir ssh connection ile istenilen dosya remote makineden kopyalanabiliniyor. ANCAK, ben dosalarla tek tek ugrasamam. Cunku cok dosya var ve bana recursive bir operasyon gerekiyor. AMA, arastirdigim kadari ile SFTPOperator'u recursive bir dosya islemini desteklemiyor. Yol ayrimina geliyoruz: 1. Remote makinedeki kopyalancak directory'deki tun dosyalari listeleyip ona gore bir loop icerisinde tek tek indirmek gerekiyor, 2. Remote makineden kopyalayacagim directory'i, host makineye mount etmek. Bu sayede kopyalamaya gerek kalmayacak ve buradaki dosyalari sanki local dosyaymis gibi isleyebilecegiz.  Birinci yol cok killi yunlu, ic ice birsuru dosyalar var, bunlari manuel olarak recursive bir sekilde indirmek cok zor. Ikinci secenek su sekilde c

Airflow ile K8s Otomasyonu

Resim
Baaazi dosyalari isleyerek baaazi output ureten bir pipelinimiz var.  Bu dosyalari hizli isleyebilmek icin, python'da yazdigimiz uygulamayi dockerize edip, k8s uzerinde paralel calistiriyoruz. Ancak elle yapiyoruz bu isi. Ne kadar da amelece.  Simdi bunu alip, Airflow ile otomatize edelim. 1. Gun Mevcut Yontem Halihazirda dosyalari manuel olarak 20 farkli klsore boluyoruz. Daha sonra 20 farkli Pod kaldirip bu klasorleri parametere olarak veriyoruz. Klasorler de ayrica distributed bir file storeage uzerinde bulunuyor, volume mount ile hallediyoruz. Haliyle pod icerisinde calisan python uygulamasi sadece kendine verilen yol'daki dosyalari isliyor. Boylece kimse kimsenin isine karismadan butun dosyalari paralel olarak isliyoruz.   Ama otomatize edebilmek icin cok da iyi bir yontem degil.  1. Elle (veya hatta otomatize olarak) dosyalari klasorlere bolmek istemiyorum 2. Pipeline'i kolay bir sekilde trigger edebilmek istiyorum 3. Parallesim'i kolayca ayarlayabilmek istiyorum