Coming soon - Get a detailed view of why an account is flagged as spam!
view details

This post has been de-listed

It is no longer included in search results and normal feeds (front page, hot posts, subreddit posts, etc). It remains visible only via the author's post history.

2
Apache Airflow PostgreSQL not Committing
Post Flair (click to view more posts with a particular flair)
Post Body

Hi,
I'm learning Airflow and have some simple tasks running. I am having a very serious problem with the update task running successfully, but, no changes in the database.

This is all just running on my laptop, so, there aren't networking complications. It's just a way to get my feet wet with airflow.

This task finds files in the pickup directory. The file has a query in it. like so:
UPDATE public.public_statistics set date_exported = CURRENT_TIMESTAMP where county_fips in (31109); COMMIT;

Running the query in psql and the update occurs. So, there's no syntax issues. Also, it appears that the PostgresOperator and PostgresHandler are being deprecated in favor of SQLExecuteQueryOperator?? Not sure if that has anything to do with my issue.

Pretty sure I am missing something small. Any advice is welcome.

import json
import sys
import os
from airflow.models import Variable
from airflow.models.xcom import XCom
#from airflow.operators.postgres_operator import PostgresOperator
from airflow import AirflowException
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

def update_pg_row():

 os.chdir('/home/sandbox/airflow/dags/datasets/pickup')
 process_file_list = list()

 all_files_list = os.listdir("./")
 #KISS any file here has not been imported to snowflake

 for i in all_files_list:
  if i.startswith('update_row_'):
    process_file_list.append(i)


 if len(process_file_list) == 0:
   print("no fips update files to process")
   return()

 print(process_file_list)
 print("length of file_list {}".format(len(process_file_list)))
 for f in process_file_list:
  file_stats = os.stat(f)
  print("processing {}".format(f))
  if file_stats.st_size > 10:
   fh = open(f, "r ")
   qry = fh.read()
   print("!!!!!!!!!!!! DEBUG !!!!!!!!!!!!!!!")
   print(qry)
   try:
    conn_op = SQLExecuteQueryOperator(
     conn_id='pg_for_me',
     task_id='update-pg-row-as-synced',
     autocommit=True,
     sql=f
     )
   except Exception as e:
    print('cannot get queryoperator')
    raise AirflowException(e)
  else:
   #should never be here.
   print("{} is too small??  {}".format(f,file_stats.st_size))

Author
Account Strength
100%
Account Age
9 years
Verified Email
Yes
Verified Flair
No
Total Karma
18,957
Link Karma
2,534
Comment Karma
16,329
Profile updated: 6 days ago
Posts updated: 11 months ago

Subreddit

Post Details

We try to extract some basic information from the post title. This is not always successful or accurate, please use your best judgement and compare these values to the post title and body for confirmation.
Posted
1 year ago