Schedule web scrapers with Apache Airflow

  • parse_recipes: It will parse individual recipes.
  • download_image: It downloads recipe image.
  • store_data: Finally store image and parsed data into MySQL
Airflow Web UI
import datetime as dtfrom airflow import DAG
from airflow.operators.python_operator import PythonOperator
def parse_recipes():
return 'parse_recipes'
def download_image():
return 'download_image'
def store_data():
return 'store_data'
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2018, 10, 1, 10, 00, 00),
'concurrency': 1,
'retries': 0
}
with DAG('parsing_recipes',
catchup=False,
default_args=default_args,
schedule_interval='*/10 * * * *',
# schedule_interval=None,
) as dag:
opr_parse_recipes = PythonOperator(task_id='parse_recipes',
python_callable=parse_recipes)
opr_download_image = PythonOperator(task_id='download_image',
python_callable=download_image)
opr_store_data = PythonOperator(task_id='store_data',
python_callable=store_data)
opr_parse_recipes >> opr_download_image >> opr_store_data

What is XCOM?

def parse_recipes(**kwargs):
return 'RETURNS parse_recipes'
def download_image(**kwargs):
ti = kwargs['ti']
v1 = ti.xcom_pull(key=None, task_ids='parse_recipes')
print('Printing Task 1 values in Download_image')
print(v1)
return 'download_image'
opr_parse_recipes = PythonOperator(task_id='parse_recipes',
python_callable=parse_recipes, provide_context=True)
def download_image(**kwargs):
ti = kwargs['ti']
v1 = ti.xcom_pull(key=None, task_ids='parse_recipes')
print('Printing Task 1 values in Download_image')
print(v1)
return 'download_image'
{
'dag': <DAG: parsing_recipes>,
'ds': '2018-10-02',
'next_ds': '2018-10-02',
'prev_ds': '2018-10-02',
'ds_nodash': '20181002',
'ts': '2018-10-02T09:56:05.289457+00:00',
'ts_nodash': '20181002T095605.289457+0000',
'yesterday_ds': '2018-10-01',
'yesterday_ds_nodash': '20181001',
'tomorrow_ds': '2018-10-03',
'tomorrow_ds_nodash': '20181003',
'END_DATE': '2018-10-02',
'end_date': '2018-10-02',
'dag_run': <DagRunparsing_recipes@2018-10-0209: 56: 05.289457+00: 00: manual__2018-10-02T09: 56: 05.289457+00: 00,
externallytriggered: True>,
'run_id': 'manual__2018-10-02T09:56:05.289457+00:00',
'execution_date': <Pendulum[
2018-10-02T09: 56: 05.289457+00: 00
]>,
'prev_execution_date': datetime.datetime(2018,
10,
2,
9,
56,
tzinfo=<TimezoneInfo[
UTC,
GMT,
+00: 00: 00,
STD
]>),
'next_execution_date': datetime.datetime(2018,
10,
2,
9,
58,
tzinfo=<TimezoneInfo[
UTC,
GMT,
+00: 00: 00,
STD
]>),
'latest_date': '2018-10-02',
'macros': <module'airflow.macros'from'/anaconda3/anaconda/lib/python3.6/site-packages/airflow/macros/__init__.py'>,
'params': {

},
'tables': None,
'task': <Task(PythonOperator): download_image>,
'task_instance': <TaskInstance: parsing_recipes.download_image2018-10-02T09: 56: 05.289457+00: 00[
running
]>,
'ti': <TaskInstance: parsing_recipes.download_image2018-10-02T09: 56: 05.289457+00: 00[
running
]>,
'task_instance_key_str': 'parsing_recipes__download_image__20181002',
'conf': <module'airflow.configuration'from'/anaconda3/anaconda/lib/python3.6/site-packages/airflow/configuration.py'>,
'test_mode': False,
'var': {
'value': None,
'json': None
},
'inlets': [

],
'outlets': [

],
'templates_dict': None
}
def download_image(**kwargs):
local_image_file = None
idx = 0
records = []
ti = kwargs['ti']
parsed_records = ti.xcom_pull(key=None, task_ids='parse_recipes')for rec in parsed_records:
idx += 1
image_url = rec['image_url']
r_url = rec['url']
print('Downloading Pic# {}'.format(idx))
local_image_file = dl_img(image_url, r_url)
rec['local_image'] = local_image_file
records.append(rec)
return records
def store_data(**kwargs):
ti = kwargs['ti']
parsed_records = ti.xcom_pull(key=None, task_ids='download_image')
print('PRINTING DUMPED RECORDS in STORE DATA')
print(parsed_records)
return 'store_data'
Airflow Admin: Db Connections Screen
Screen to change MySQL credentials
def store_data(**kwargs):
ti = kwargs['ti']
parsed_records = ti.xcom_pull(key=None, task_ids='download_image')
connection = MySqlHook(mysql_conn_id='mysql_default')
for r in parsed_records:
url = r['url']
data = json.dumps(r)
sql = 'INSERT INTO recipes(url,data) VALUES (%s,%s)'
connection.run(sql, autocommit=True, parameters=(url, data))
return True
CREATE TABLE recipes (
`id` int(11) UNSIGNED NOT NULL AUTO_INCREMENT,
`url` varchar(100) NOT NULL,
`data` text NOT NULL,
PRIMARY KEY (`id`)
);
opr_email = EmailOperator(
task_id='send_email',
to='jon@yahoo.com',
subject='Airflow Finished',
html_content=""" <h3>DONE</h3> """,
dag=dag
)
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.gmail.com
smtp_starttls = False
smtp_ssl = True
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = user@gmail.com
smtp_password = your_password
smtp_port = 465
smtp_mail_from = user@gmail.com

Conclusion

If you like this post then you should subscribe to my newsletter.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Adnan Siddiqi

Adnan Siddiqi

2.8K Followers

Pakistani | Husband | Father | Software Consultant | Developer | blogger. I occasionally try to make stuff with code. http://adnansiddiqi.me