Friday, August 19, 2016

Panda Dataframe as a Process Tracker (postgres example)

Sometimes you need to keep track of the number of rows processed for a given table.

Let's assume you are working in postgres and you want want to do row by row operations to do some sort of data manipulation.  Your user requires you to keep track of each row's changes and wants to know the number of failures with the updates and the number of successful updates. The output must be in a text file with pretty formatting.

There are many ways to accomplish this task, but let's use Pandas, arcpy.da Update Cursor, and some sql.


#--------------------------------------------------------------------------
def create_tracking_table(sde, tables):
    """
    creates a panadas dataframe from a sql statement
    Input:
       sde - sde connection file
       tables - name of the table to get the counts for
    Ouput:
       Panda Dataframe with column names: Table_Name, Total_Rows and
       Processed
    """
    desc = arcpy.Describe(sde)
    connectionProperties = desc.connectionProperties
    username = connectionProperties.user
    sql = """SELECT
       nspname AS schemaname,relname,reltuples
    FROM pg_class C
     LEFT JOIN pg_namespace N ON (N.oid = C.relnamespace)
    WHERE
       nspname NOT IN ('pg_catalog', 'information_schema') AND
       relkind='r' AND
       nspname='{schema}' AND
       relname in ({tables})
    ORDER BY reltuples DESC;""".format(
                                   schema=username,
                                   tables=",".join(["'%s'" % t for t in tables])
                               )
    columns = ['schemaname','Table_Name','Total_Rows']

    con = arcpy.ArcSDESQLExecute(sde)
    rows = con.execute(sql)
    count_df = pd.DataFrame.from_records(rows, columns=columns)
    del count_df['schemaname']
    count_df['Processed'] = 0

    count_df['Errors'] = 0
    return count_df



Now we have a function that will return a dataframe object from a SQL statement.  It contains 3 fields; Table_Name, Total_Rows, and Processed.  Table_name is the name of the table in the database.  Total_Rows is the length of the table.  Processed is where you are going to modify every a row gets updated successfully.  Errors is the numeric column where if an update fails, the value will be added to.

So let's use what we just made:

count_df = create_tracking_table(sde, tables)
for table in tables:
   with arcpy.da.UpdateCursor(table, "*") as urows:
      for urow in urows:
         try:
            urow[3] += 1
            urows.updateRow(urow)
            df.loc[df['Table_Name'] == '%s' % table, 'Processed'] += 1
         except:
            df.loc[df['Table_Name'] == '%s' % table, 'Errors'] += 1

The pseudo code above shows that whenever an exception is raised, 'Errors' get 1 added to it, and when it successfully updates a row 'Processed' gets updated.

The third part of the task was to output the count table to a text file which can be done easily using the to_string() method.

with open(, 'w') as writer:
   writer.write(count_df.to_string(index=False, col_space=12, justify='left'))
   writer.flush()

So there you have it.  We have a nice human readable output table in a text file.

Enjoy