Airflow paralelizm

 Airflow ile bir dag icerisinde paralelde tasklar calistirabiliriz evet bu dogru. Ancak kac taski anyni anda paralel calistirabilirz bu birkac seye bagli. 

1- Executor
Hangi executor'u kullaniyorsunuz? Airflow'u lokal olarak kurdugunuzda default olarak SequentialExecutor kullaniyor. Ki bu da zaten paralel calistirma imkani vermiyor. Yine lokalde tek makinede ama birden fazla task calistirmak istersek bu sefer LocalExecutor kullanabiliriz. Zaten gerisi ( CeleryExecutor, DaskExecutor, KubernetesExecutor, CeleryKubernetesExecutor ) paralel calistirma uzerine kurulu. Ozet olarak en az LocalExecutor ile calisiyor olmamiz lazim ki paralelizasyon ayarlamasi yapabilelim. 

2- Airflow paralelizm
Tum daglarda (ayni anda birden fazla dag da calisabilir sonucta) ayni anda toplamda kac paralel task calisabilecegini oncelikle ayarlamamiz gerekiyor. Cunku bir ust limit. Bunu ayarlamanin 2 yolu var. Birincisi airflow.cfg dosyasindan bu ayari degistirebiliriz ya da airflow scheduler start olmadan once bir environment variable set ederek bu parametreyi airflowa verebiliriz. 

Ben halihazirda docker compose ile airflow kullandigim icin airflow.cfg doyasi docker image icerisine gomulu, dolayisi ile env var secenegini tercih ediyorum. 

export AIRFLOW__CORE__PARALLELISM=50

seklinde airflow bir anda 50 task paralelde calistirabilsin diye belirtiyorum. 

3- Dag paralelizm
Toplam airflow paralelizmini ayarladiktan sonra bir dah icin de maksimum paralel calisacak task sayisini ayarlamamiz gerekiyor. Default olarak bu deger 16. 

dag = DAG(
    "super_paralel_dag",
    schedule_interval=None,
    tags=[],
    default_args=args,
    concurrency=50
)

Seklinde concurrency degerini 50'ye set ederek bu dagin ayni anda 50 taski calistirabilmesini sagliyorum. 

Bu ayarlamari yaparsak eger cok super paralel calisan bir dag elde etmis oluruz. 

Yorumlar

Bu blogdaki popüler yayınlar

Python'da Multithreading ve Multiprocessing

Threat Modeling 1

Encoding / Decoding