Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
278 views
in Technique[技术] by (71.8m points)

amazon web services - Running spark -job on AWS EMR

I am trying to run my first job on AWS EMR . I have created a 4 node cluster and default setting. I am reading and writing into AWS S3. EMR, input and output S3 bucket are in same region.Below is the script I am running

import configparser
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.types as t

config = configparser.ConfigParser()
config.read('dl.cfg')


# os.environ['AWS_ACCESS_KEY_ID'] = config['AWS_ACCESS_KEY_ID']
# os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS_SECRET_ACCESS_KEY']


def create_spark_session():
    # spark = SparkSession 
    #     .builder.config("spark.driver.memory", "2g").config("spark.executor.memory", "8g").getOrCreate()
    # .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") 
    spark = SparkSession 
        .builder.master("yarn").getOrCreate()

    return spark


def process_song_data(spark, input_data, output_data):
    # get filepath to song data file
    #song_data = f"{input_data}/song-data//*//*//*//*.json"
    song_data = f"{input_data}/song-data/*/*/*/*.json"

    # read song data file

    # extract song data
    # song_id , title , artist_id , year , duration
    song_data_schema = t.StructType(
        [
            t.StructField("num_songs", t.IntegerType(), True),
            t.StructField("artist_id", t.StringType(), True),
            t.StructField("artist_latitude", t.FloatType(), True),
            t.StructField("artist_longitude", t.FloatType(), True),
            t.StructField("artist_location", t.StringType(), True),
            t.StructField("artist_name", t.StringType(), True),
            t.StructField("song_id", t.StringType(), True),
            t.StructField("title", t.StringType(), True),
            t.StructField("duration", t.FloatType(), True),
            t.StructField("year", t.IntegerType(), True)
        ]

    )

    df = spark.read.schema(song_data_schema).json(song_data)

    songs_table = df.select("song_id", "title", "artist_id", "year", "duration")

    # write songs table to parquet files partitioned by year and artist
    songs_table.write.mode(saveMode="overwrite").partitionBy("year", "artist_id").parquet("D://UdacityProjectDataOutput//song_table//")

    # extract columns to create artists table
    # artist_id , name , location , lattitude , logitude

    artists_table = df.selectExpr("artist_id", "artist_name as name", "artist_location as location",
                                  "artist_latitude as latitude", "artist_longitude as longitude")

    # write artists table to parquet files
    artists_table.write.mode(saveMode="overwrite").parquet("D://UdacityProjectDataOutput//artists_table//")
    # artists_table.show()


def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    log_data = f"{input_data}/log-data/*/*/*.json"

    log_schema = t.StructType(
        [
            t.StructField("artist", t.StringType(), True),
            t.StructField("auth", t.StringType(), True),
            t.StructField("firstName", t.StringType(), True),
            t.StructField("gender", t.StringType(), True),
            t.StructField("itemInSession", t.IntegerType(), True),
            t.StructField("lastName", t.StringType(), True),
            t.StructField("length", t.FloatType(), True),
            t.StructField("level", t.StringType(), True),
            t.StructField("location", t.StringType(), True),
            t.StructField("method", t.StringType(), True),
            t.StructField("page", t.StringType(), True),
            t.StructField("registration", t.DoubleType(), True),
            t.StructField("sessionId", t.IntegerType(), True),
            t.StructField("song", t.StringType(), True),
            t.StructField("status", t.IntegerType(), True),
            t.StructField("ts", t.LongType(), True),
            t.StructField("userAgent", t.StringType(), True),
            t.StructField("userId", t.IntegerType(), True),
        ]
    )

    # read log data file
    df = spark.read.schema(log_schema).json(log_data)

    # filter by actions for song plays
    event_df = df.filter("page = 'NextSong'").withColumn("ts_time", f.to_timestamp(f.col("ts") / 1000)).drop("ts")

    # event_df.cache()

    # extract columns for users table
    users_table = event_df.selectExpr("userId as user_id", "firstName as first_name", "lastName as last_name", "gender",
                                      "level")

    # write users table to parquet files
    users_table.write.mode(saveMode="overwrite").parquet("D://UdacityProjectDataOutput//user_table//")

    # #################### BELOW CODE NOT REQUIRED . SPARK ALREADY HAS BUILD IN function to get the TIMESTAMP
    # ################ create timestamp column from original timestamp column get_timestamp = udf() df =
    #
    # # create datetime column from original timestamp column
    # get_datetime = udf()
    # df =
    ################################# END #########################################

    start_time = f.date_format(event_df.ts_time, 'HH:mm:ss').alias('start_time')
    hour = f.hour(event_df.ts_time).alias("hour")
    day = f.dayofmonth(event_df.ts_time).alias("day")
    week = f.weekofyear(event_df.ts_time).alias("week")
    month = f.month(event_df.ts_time).alias("month")
    year = f.year(event_df.ts_time).alias("year")
    # is_weekday = F.dayofweek(df.ts).isin([1,7]) == False
    is_weekday = ~f.dayofweek(event_df.ts_time).isin([1, 7])

    time_table = event_df.select(start_time, hour, day, week, month, year, is_weekday.alias("weekday"))

    # write time table to parquet files partitioned by year and month
    time_table.write.mode(saveMode="overwrite").partitionBy("year", "month").parquet("D://UdacityProjectDataOutput//time_table//")

    # read in song data to use for songplays table
    input_path = "D://UdacityProjectDataOutput//"
    song_df = spark.read.parquet(f'{input_path}//song_table//*//*//*.parquet')
    artist_df = spark.read.parquet(f'{input_path}//artists_table//*.parquet').drop("location")

    # extract columns from joined song and log datasets to create songplays table

    songplays_DF = event_df.join(song_df, f.expr("song rlike title"))
    songplays_table = songplays_DF.join(artist_df, f.expr("artist rlike name")).withColumn("year", f.year(
        songplays_DF.ts_time)).withColumn("month", f.month(songplays_DF.ts_time))
    songplays_table = songplays_table.selectExpr("monotonically_increasing_id() as songplay_id",
                                                 "ts_time as start_time", "userId as user_id", "level", "song_id",
                                                 "artist_id",
                                                 "sessionId as session_id", "location", "userAgent as user_agent",
                                                 "year", "month")

    songplays_table.write.mode(saveMode="overwrite").partitionBy("year", "month").parquet("D://UdacityProjectDataOutput//songplays_table//")


def main():
    spark = create_spark_session()
    input_data = "s3a://udacity-dend/"
    #input_data = "D://UdacityProjectData"
    output_data = "s3a://spotifybucket2/"

    process_song_data(spark, input_data, output_data)
    process_log_data(spark, input_data, output_data)


if __name__ == "__main__":
    main()

I am running the script by ssh into master node and running below command

/usr/bin/spark-submit --master yarn ./udacity_songs.py

However nothing happens.After few seconds below is what I see in the console

21/01/27 17:37:00 INFO TypeUtil: JVM Runtime does not support Modules
21/01/27 17:37:04 INFO YarnClientSchedulerBackend: Requesting to kill executor(s) 3
21/01/27 17:37:04 INFO YarnClientSchedulerBackend: Actual list of executor(s) to be killed is 3
21/01/27 17:37:04 INFO ExecutorAllocationManager: Executors 3 removed due to idle timeout.(new desired total will be 2)
21/01/27 17:37:06 INFO YarnClientSchedulerBackend: Requesting to kill executor(s) 1
21/01/27 17:37:06 INFO YarnClientSchedulerBackend: Actual list of executor(s) to be killed is 1
21/01/27 17:37:06 INFO ExecutorAllocationManager: Executors 1 removed due to idle timeout.(new desired total will be 1)
21/01/27 17:37:06 INFO YarnClientSchedulerBackend: Requesting to kill executor(s) 2
21/01/27 17:37:06 INFO YarnClientSchedulerBackend: Actual list of executor(s) to be killed is 2
21/01/27 17:37:06 INFO ExecutorAllocationManager: Executors 2 removed due to idle timeout.(new desired total will be 0)
21/01/27 17:37:07 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 1.
21/01/27 17:37:07 INFO DAGScheduler: Executor lost: 1 (epoch 0)
21/01/27 17:37:07 INFO BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster.
21/01/27 17:37:07 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(1, ip-XXX-XX-XX-XX.us-west-2.compute.internal, 43847, None)
21/01/27 17:37:07 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor
21/01/27 17:37:07 INFO YarnScheduler: Executor 1 on ip-XXX-XX-XX-XX.us-west-2.compute.internal killed by driver.
21/01/27 17:37:07 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 2.
21/01/27 17:37:07 INFO DAGScheduler: Executor lost: 2 (epoch 0)
21/01/27 17:37:07 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
21/01/27 17:37:07 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(2, ip-XXX-XX-XX-XX.us-west-2.compute.internal, 34215, None)
21/01/27 17:37:07 INFO BlockManagerMaster: Removed 2 successfully in removeExecutor
21/01/27 17:37:07 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 3.
21/01/27 17:37:07 INFO DAGScheduler: Executor lost: 3 (epoch 0)
21/01/27 17:37:07 INFO BlockManagerMasterEndpoint: Trying to remove executor 3 from BlockManagerMaster.
21/01/27 17:37:07 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(3, ip-XXX-XX-XX-XX.us-west-2.compute.internal, 45221, None)
21/01/27 17:37:07 INFO BlockManagerMaster: Removed 3 successfully in removeExecutor
21/01/27 17:37:07 INFO YarnScheduler: Executor 2 on ip-XXX-XX-XX-XX.us-west-2.compute.internal killed by driver.
21/01/27 17:37:07 INFO YarnScheduler: Executor 3 on ip-XXX-XX-XX-XX.us-west-2.compute.internal killed by driver.

And after ctrl + c below is what it shows

CTraceback (most recent call last):
  File "/home/hadoop/./udacity_songs.py", line 161, in <module>
    main()
  File "/home/hadoop/./udacity_songs.py", line 156, in main
    process_song_data(spark, input_data, output_data)
  File "/home/hadoop/./udacity_songs.py", line 49, in process_song_d

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...