Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
115 views
in Technique[技术] by (71.8m points)

python - Creating a Data Pipeline to BigQuery Using Cloud Functions and Cloud Scheduler

I am trying to build a Data Pipeline that will download the data from this website and push it to a BigQuery Table.

def OH_Data_Pipeline(trigger='Yes'):
    if trigger=='Yes':
        import pandas as pd
        import pandas_gbq
        import datetime
        schema=[{'name': 'SOS_VOTERID', 'type': 'STRING'},{'name': 'COUNTY_NUMBER', 'type': 'STRING'}, {'name': 'COUNTY_ID', 'type': 'INT64'}, {'name': 'LAST_NAME', 'type': 'STRING'}, {'name': 'FIRST_NAME', 'type': 'STRING'}, {'name': 'MIDDLE_NAME', 'type': 'STRING'}, {'name': 'SUFFIX', 'type': 'STRING'}, {'name': 'DATE_OF_BIRTH', 'type': 'DATE'}, 
            {'name': 'REGISTRATION_DATE', 'type': 'DATE'}, {'name': 'VOTER_STATUS', 'type': 'STRING'}, 
            {'name': 'PARTY_AFFILIATION', 'type': 'STRING'}, {'name': 'RESIDENTIAL_ADDRESS1', 'type': 'STRING'}, 
            {'name': 'RESIDENTIAL_SECONDARY_ADDR', 'type': 'STRING'}, {'name': 'RESIDENTIAL_CITY', 'type': 'STRING'}, 
            {'name': 'RESIDENTIAL_STATE', 'type': 'STRING'}, {'name': 'RESIDENTIAL_ZIP', 'type': 'STRING'}, 
            {'name': 'RESIDENTIAL_ZIP_PLUS4', 'type': 'STRING'}, {'name': 'RESIDENTIAL_COUNTRY', 'type': 'STRING'}, 
            {'name': 'RESIDENTIAL_POSTALCODE', 'type': 'STRING'}, {'name': 'MAILING_ADDRESS1', 'type': 'STRING'}, 
            {'name': 'MAILING_SECONDARY_ADDRESS', 'type': 'STRING'}, {'name': 'MAILING_CITY', 'type': 'STRING'}, 
            {'name': 'MAILING_STATE', 'type': 'STRING'}, {'name': 'MAILING_ZIP', 'type': 'STRING'}, 
            {'name': 'MAILING_ZIP_PLUS4', 'type': 'STRING'}, {'name': 'MAILING_COUNTRY', 'type': 'STRING'}, 
            {'name': 'MAILING_POSTAL_CODE', 'type': 'STRING'}, {'name': 'CAREER_CENTER', 'type': 'STRING'}, 
            {'name': 'CITY', 'type': 'STRING'}, {'name': 'CITY_SCHOOL_DISTRICT', 'type': 'STRING'}, 
            {'name': 'COUNTY_COURT_DISTRICT', 'type': 'STRING'}, {'name': 'CONGRESSIONAL_DISTRICT', 'type': 'STRING'}, 
            {'name': 'COURT_OF_APPEALS', 'type': 'STRING'}, {'name': 'EDU_SERVICE_CENTER_DISTRICT', 'type': 'STRING'}, 
            {'name': 'EXEMPTED_VILL_SCHOOL_DISTRICT', 'type': 'STRING'}, {'name': 'LIBRARY', 'type': 'STRING'}, 
            {'name': 'LOCAL_SCHOOL_DISTRICT', 'type': 'STRING'}, {'name': 'MUNICIPAL_COURT_DISTRICT', 'type': 'STRING'}, 
            {'name': 'PRECINCT_NAME', 'type': 'STRING'}, {'name': 'PRECINCT_CODE', 'type': 'STRING'}, 
            {'name': 'STATE_BOARD_OF_EDUCATION', 'type': 'STRING'}, {'name': 'STATE_REPRESENTATIVE_DISTRICT', 'type': 'STRING'}, 
            {'name': 'STATE_SENATE_DISTRICT', 'type': 'STRING'}, {'name': 'TOWNSHIP', 'type': 'STRING'}, 
            {'name': 'VILLAGE', 'type': 'STRING'}, {'name': 'WARD', 'type': 'STRING'}, 
            {'name': 'PRIMARY_03_07_2000', 'type': 'STRING'}, {'name': 'GENERAL_11_07_2000', 'type': 'INT64'}, 
            {'name': 'SPECIAL_05_08_2001', 'type': 'STRING'}, {'name': 'GENERAL_11_06_2001', 'type': 'INT64'}, 
            {'name': 'PRIMARY_05_07_2002', 'type': 'STRING'}, {'name': 'GENERAL_11_05_2002', 'type': 'INT64'}, 
            {'name': 'SPECIAL_05_06_2003', 'type': 'STRING'}, {'name': 'GENERAL_11_04_2003', 'type': 'INT64'}, 
            {'name': 'PRIMARY_03_02_2004', 'type': 'STRING'}, {'name': 'GENERAL_11_02_2004', 'type': 'INT64'}, 
            {'name': 'SPECIAL_02_08_2005', 'type': 'STRING'}, {'name': 'PRIMARY_05_03_2005', 'type': 'STRING'}, 
            {'name': 'PRIMARY_09_13_2005', 'type': 'STRING'}, {'name': 'GENERAL_11_08_2005', 'type': 'INT64'}, 
            {'name': 'SPECIAL_02_07_2006', 'type': 'STRING'}, {'name': 'PRIMARY_05_02_2006', 'type': 'STRING'}, 
            {'name': 'GENERAL_11_07_2006', 'type': 'INT64'}, {'name': 'PRIMARY_05_08_2007', 'type': 'STRING'}, 
            {'name': 'PRIMARY_09_11_2007', 'type': 'STRING'}, {'name': 'GENERAL_11_06_2007', 'type': 'INT64'}, 
            {'name': 'PRIMARY_11_06_2007', 'type': 'STRING'}, {'name': 'GENERAL_12_11_2007', 'type': 'INT64'}, 
            {'name': 'PRIMARY_03_04_2008', 'type': 'STRING'}, {'name': 'PRIMARY_10_14_2008', 'type': 'STRING'}, 
            {'name': 'GENERAL_11_04_2008', 'type': 'INT64'}, {'name': 'GENERAL_11_18_2008', 'type': 'INT64'}, 
            {'name': 'PRIMARY_05_05_2009', 'type': 'STRING'}, {'name': 'PRIMARY_09_08_2009', 'type': 'STRING'}, 
            {'name': 'PRIMARY_09_15_2009', 'type': 'STRING'}, {'name': 'PRIMARY_09_29_2009', 'type': 'STRING'}, 
            {'name': 'GENERAL_11_03_2009', 'type': 'INT64'}, {'name': 'PRIMARY_05_04_2010', 'type': 'STRING'}, 
            {'name': 'PRIMARY_07_13_2010', 'type': 'STRING'}, {'name': 'PRIMARY_09_07_2010', 'type': 'STRING'}, 
            {'name': 'GENERAL_11_02_2010', 'type': 'INT64'}, {'name': 'PRIMARY_05_03_2011', 'type': 'STRING'}, 
            {'name': 'PRIMARY_09_13_2011', 'type': 'STRING'}, {'name': 'GENERAL_11_08_2011', 'type': 'INT64'}, 
            {'name': 'PRIMARY_03_06_2012', 'type': 'STRING'}, {'name': 'GENERAL_11_06_2012', 'type': 'INT64'}, 
            {'name': 'PRIMARY_05_07_2013', 'type': 'STRING'}, {'name': 'PRIMARY_09_10_2013', 'type': 'STRING'}, 
            {'name': 'PRIMARY_10_01_2013', 'type': 'STRING'}, {'name': 'GENERAL_11_05_2013', 'type': 'INT64'}, 
            {'name': 'PRIMARY_05_06_2014', 'type': 'STRING'}, {'name': 'GENERAL_11_04_2014', 'type': 'INT64'}, 
            {'name': 'PRIMARY_05_05_2015', 'type': 'STRING'}, {'name': 'PRIMARY_09_15_2015', 'type': 'STRING'}, 
            {'name': 'GENERAL_11_03_2015', 'type': 'INT64'}, {'name': 'PRIMARY_03_15_2016', 'type': 'STRING'}, 
            {'name': 'GENERAL_06_07_2016', 'type': 'INT64'}, {'name': 'PRIMARY_09_13_2016', 'type': 'STRING'}, 
            {'name': 'GENERAL_11_08_2016', 'type': 'INT64'}, {'name': 'PRIMARY_05_02_2017', 'type': 'STRING'}, 
            {'name': 'PRIMARY_09_12_2017', 'type': 'STRING'}, {'name': 'GENERAL_11_07_2017', 'type': 'INT64'}, 
            {'name': 'PRIMARY_05_08_2018', 'type': 'STRING'}, {'name': 'GENERAL_08_07_2018', 'type': 'INT64'}, 
            {'name': 'GENERAL_11_06_2018', 'type': 'INT64'}, {'name': 'PRIMARY_05_07_2019', 'type': 'STRING'}, 
            {'name': 'PRIMARY_09_10_2019', 'type': 'STRING'}, {'name': 'GENERAL_11_05_2019', 'type': 'INT64'}]
        prim_list = ['PRIMARY-03/07/2000', 'SPECIAL-05/08/2001', 'PRIMARY-05/07/2002', 'SPECIAL-05/06/2003', 'PRIMARY-03/02/2004', 
                'SPECIAL-02/08/2005', 'PRIMARY-05/03/2005', 'PRIMARY-09/13/2005', 'SPECIAL-02/07/2006', 'PRIMARY-05/02/2006', 
                'PRIMARY-05/08/2007', 'PRIMARY-09/11/2007', 'PRIMARY-11/06/2007', 'PRIMARY-03/04/2008', 'PRIMARY-10/14/2008', 
                'PRIMARY-05/05/2009', 'PRIMARY-09/08/2009', 'PRIMARY-09/15/2009', 'PRIMARY-09/29/2009', 'PRIMARY-05/04/2010', 
                'PRIMARY-07/13/2010', 'PRIMARY-09/07/2010', 'PRIMARY-05/03/2011', 'PRIMARY-09/13/2011', 'PRIMARY-03/06/2012', 
                'PRIMARY-05/07/2013', 'PRIMARY-09/10/2013', 'PRIMARY-10/01/2013', 'PRIMARY-05/06/2014', 'PRIMARY-05/05/2015', 
                'PRIMARY-09/15/2015', 'PRIMARY-03/15/2016', 'PRIMARY-09/13/2016', 'PRIMARY-05/02/2017', 'PRIMARY-09/12/2017', 
                'PRIMARY-05/08/2018', 'PRIMARY-05/07/2019', 'PRIMARY-09/10/2019']
        prim_list = [f.replace('-', '_').replace('/', '_') for f in prim_list]
        gen_list = ['GENERAL-11/07/2000', 'GENERAL-11/06/2001', 'GENERAL-11/05/2002', 'GENERAL-11/04/2003', 'GENERAL-11/02/2004', 
               'GENERAL-11/08/2005', 'GENERAL-11/07/2006', 'GENERAL-11/06/2007', 'GENERAL-12/11/2007', 'GENERAL-11/04/2008', 
               'GENERAL-11/18/2008', 'GENERAL-11/03/2009', 'GENERAL-11/02/2010', 'GENERAL-11/08/2011', 'GENERAL-11/06/2012', 
               'GENERAL-11/05/2013', 'GENERAL-11/04/2014', 'GENERAL-11/03/2015', 'GENERAL-06/07/2016', 'GENERAL-11/08/2016', 
               'GENERAL-11/07/2017', 'GENERAL-08/07/2018', 'GENERAL-11/06/2018', 'GENERAL-11/05/2019']
        gen_list = [f.replace('-', '_').replace('/', '_') for f in gen_list]
        party_list = ['PARTY_AFFILIATION']
        df=[pd.read_csv('https://www6.sos.state.oh.us/ords/f?p=VOTERFTP:DOWNLOAD::FILE:NO:2:P2_PRODUCT_NUMBER:{}'.format(88+f), encoding='Latin1', low_memory=False) for f in range(1, 17)]
        df=pd.concat(df)
        df.columns = [f.replace('-', '_').replace('/', '_') for f in df.columns]
        df['birth_year'] = df['DATE_OF_BIRTH'].map(lambda x: str(x)[:-6]).astype(int)
        df['Age'] = now.year - df['birth_year']
        for f in prim_list:
            df.loc[df[f]=='D', f]='Democrat'
            df.loc[df[f]=='R', f]='Republican'
            df.loc[df[f]=='G', f]='Green'
            df.loc[df[f]=='E', f]='Reform'
            df.loc[df[f]=='L', f]='Libertarian'
            df.loc[df[f]=='C', f]='Constitution'
            df.loc[df[f]=='N', f]='Natural Law'
            df.loc[df[f]=='S', f]='Socialist'
            df.loc[df[f]=='X', f]='Without Affiliation'
            df.loc[(df[f]=='') | (df[f].isnull()==True) | (df[f]==0), f]='Not Voted'
        for f in party_list:
            df.loc[df[f]=='D', f]='Democrat'
            df.loc[df[f]=='R', f]='Republican'
            df.loc[df[f]=='G', f]='Green'
            df.loc[df[f]=='E', f]='Reform'
            df.loc[df[f]=='L', f]='Libertarian'
            df.loc[df[f]=='C', f]='Constitution'
            df.loc[df[f]=='N', f]='Natural Law'
            df.loc[df[f]=='S', f]='Socialist'
            df.loc[df[f]=='X', f]='Unaffiliated'
            df.loc[(df[f]=='') | (df[f].isnull()==True) | (df[f]==0), f]='Unaffiliated'
        for g in gen_list:  
            df.loc[(df[g]!='') & (df[g].isnull()!=True) & (df[g]!=0) & (df[g]!='NaN'), g]=1
            df.loc[(df[g]=='') | (df[g].isnull()==True) | (df[g]==0) | (df[g]=='NaN'), g]=0
        df[gen_list]=df[gen_list].astype(int)
        df[prim_list]=df[prim_list].astype(str)
        df[party_list]=df[party_list].astype(str)
        df.to_gbq(destination_table='Voterfile.OH_Voterfile', project_id='oh-data-pipeline', if_exists='replace', table_schema=schema, reauth=False)
    else:
        pass

The problem is after defining the function in cloud functions, I will run the script in cloud scheduler, it will say the function ran, but no data will show up in BigQuery.

enter image description here

enter image description here

enter image description here

Here are the logs as well:[ { "insertId": "1idtfdbg5drzu63", "jsonPayload": { "targetType": "HTTP", "url": "https://us-central1-oh-data-pipeline.cloudfunctions.net/OH_Data_Pipeline", "@type": "type.googleapis.com/google.cloud.scheduler.logging.AttemptFinished", "jobName": "projects/oh-data-pipeline/locations/us-east4/jobs/OH_Voterfile_Data_Loader" }, "httpRequest": { "status": 200 }, "resource": { "type": "cloud_scheduler_job", "labels": { "project_id": "oh-data-pipeline", "location": "us-east4", "job_id": "OH_Voterfile_Data_Loader" } }, "timestamp": "2020-01-01T21:12:39.949108697Z", "severity": "INFO", "logName": "projects/oh-data-pipeline/logs/cloudscheduler.googleapis.com%2Fexecutions", "receiveTimestamp": "2020-01-01T21:12:39.949108697Z" }, { "insertId": "k9f9cjg5ds4bft", "jsonPayload": { "jobName": "projects/oh-data-pipeline/locations/us-east4/jobs/OH_Voterfile


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

56.9k users

...