This post has been de-listed
It is no longer included in search results and normal feeds (front page, hot posts, subreddit posts, etc). It remains visible only via the author's post history.
Hi,
I'm learning Airflow and have some simple tasks running. I am having a very serious problem with the update task running successfully, but, no changes in the database.
This is all just running on my laptop, so, there aren't networking complications. It's just a way to get my feet wet with airflow.
This task finds files in the pickup directory. The file has a query in it. like so:
UPDATE public.public_statistics set date_exported = CURRENT_TIMESTAMP where county_fips in (31109); COMMIT;
Running the query in psql and the update occurs. So, there's no syntax issues. Also, it appears that the PostgresOperator and PostgresHandler are being deprecated in favor of SQLExecuteQueryOperator?? Not sure if that has anything to do with my issue.
Pretty sure I am missing something small. Any advice is welcome.
import json
import sys
import os
from airflow.models import Variable
from airflow.models.xcom import XCom
#from airflow.operators.postgres_operator import PostgresOperator
from airflow import AirflowException
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
def update_pg_row():
os.chdir('/home/sandbox/airflow/dags/datasets/pickup')
process_file_list = list()
all_files_list = os.listdir("./")
#KISS any file here has not been imported to snowflake
for i in all_files_list:
if i.startswith('update_row_'):
process_file_list.append(i)
if len(process_file_list) == 0:
print("no fips update files to process")
return()
print(process_file_list)
print("length of file_list {}".format(len(process_file_list)))
for f in process_file_list:
file_stats = os.stat(f)
print("processing {}".format(f))
if file_stats.st_size > 10:
fh = open(f, "r ")
qry = fh.read()
print("!!!!!!!!!!!! DEBUG !!!!!!!!!!!!!!!")
print(qry)
try:
conn_op = SQLExecuteQueryOperator(
conn_id='pg_for_me',
task_id='update-pg-row-as-synced',
autocommit=True,
sql=f
)
except Exception as e:
print('cannot get queryoperator')
raise AirflowException(e)
else:
#should never be here.
print("{} is too small?? {}".format(f,file_stats.st_size))
Subreddit
Post Details
- Posted
- 1 year ago
- Reddit URL
- View post on reddit.com
- External URL
- reddit.com/r/dataenginee...