I am trying to run my first job on AWS EMR . I have created a 4 node cluster and default setting.
I am reading and writing into AWS S3. EMR, input and output S3 bucket are in same region.Below is the script I am running
import configparser
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.types as t
config = configparser.ConfigParser()
config.read('dl.cfg')
# os.environ['AWS_ACCESS_KEY_ID'] = config['AWS_ACCESS_KEY_ID']
# os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS_SECRET_ACCESS_KEY']
def create_spark_session():
# spark = SparkSession
# .builder.config("spark.driver.memory", "2g").config("spark.executor.memory", "8g").getOrCreate()
# .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")
spark = SparkSession
.builder.master("yarn").getOrCreate()
return spark
def process_song_data(spark, input_data, output_data):
# get filepath to song data file
#song_data = f"{input_data}/song-data//*//*//*//*.json"
song_data = f"{input_data}/song-data/*/*/*/*.json"
# read song data file
# extract song data
# song_id , title , artist_id , year , duration
song_data_schema = t.StructType(
[
t.StructField("num_songs", t.IntegerType(), True),
t.StructField("artist_id", t.StringType(), True),
t.StructField("artist_latitude", t.FloatType(), True),
t.StructField("artist_longitude", t.FloatType(), True),
t.StructField("artist_location", t.StringType(), True),
t.StructField("artist_name", t.StringType(), True),
t.StructField("song_id", t.StringType(), True),
t.StructField("title", t.StringType(), True),
t.StructField("duration", t.FloatType(), True),
t.StructField("year", t.IntegerType(), True)
]
)
df = spark.read.schema(song_data_schema).json(song_data)
songs_table = df.select("song_id", "title", "artist_id", "year", "duration")
# write songs table to parquet files partitioned by year and artist
songs_table.write.mode(saveMode="overwrite").partitionBy("year", "artist_id").parquet("D://UdacityProjectDataOutput//song_table//")
# extract columns to create artists table
# artist_id , name , location , lattitude , logitude
artists_table = df.selectExpr("artist_id", "artist_name as name", "artist_location as location",
"artist_latitude as latitude", "artist_longitude as longitude")
# write artists table to parquet files
artists_table.write.mode(saveMode="overwrite").parquet("D://UdacityProjectDataOutput//artists_table//")
# artists_table.show()
def process_log_data(spark, input_data, output_data):
# get filepath to log data file
log_data = f"{input_data}/log-data/*/*/*.json"
log_schema = t.StructType(
[
t.StructField("artist", t.StringType(), True),
t.StructField("auth", t.StringType(), True),
t.StructField("firstName", t.StringType(), True),
t.StructField("gender", t.StringType(), True),
t.StructField("itemInSession", t.IntegerType(), True),
t.StructField("lastName", t.StringType(), True),
t.StructField("length", t.FloatType(), True),
t.StructField("level", t.StringType(), True),
t.StructField("location", t.StringType(), True),
t.StructField("method", t.StringType(), True),
t.StructField("page", t.StringType(), True),
t.StructField("registration", t.DoubleType(), True),
t.StructField("sessionId", t.IntegerType(), True),
t.StructField("song", t.StringType(), True),
t.StructField("status", t.IntegerType(), True),
t.StructField("ts", t.LongType(), True),
t.StructField("userAgent", t.StringType(), True),
t.StructField("userId", t.IntegerType(), True),
]
)
# read log data file
df = spark.read.schema(log_schema).json(log_data)
# filter by actions for song plays
event_df = df.filter("page = 'NextSong'").withColumn("ts_time", f.to_timestamp(f.col("ts") / 1000)).drop("ts")
# event_df.cache()
# extract columns for users table
users_table = event_df.selectExpr("userId as user_id", "firstName as first_name", "lastName as last_name", "gender",
"level")
# write users table to parquet files
users_table.write.mode(saveMode="overwrite").parquet("D://UdacityProjectDataOutput//user_table//")
# #################### BELOW CODE NOT REQUIRED . SPARK ALREADY HAS BUILD IN function to get the TIMESTAMP
# ################ create timestamp column from original timestamp column get_timestamp = udf() df =
#
# # create datetime column from original timestamp column
# get_datetime = udf()
# df =
################################# END #########################################
start_time = f.date_format(event_df.ts_time, 'HH:mm:ss').alias('start_time')
hour = f.hour(event_df.ts_time).alias("hour")
day = f.dayofmonth(event_df.ts_time).alias("day")
week = f.weekofyear(event_df.ts_time).alias("week")
month = f.month(event_df.ts_time).alias("month")
year = f.year(event_df.ts_time).alias("year")
# is_weekday = F.dayofweek(df.ts).isin([1,7]) == False
is_weekday = ~f.dayofweek(event_df.ts_time).isin([1, 7])
time_table = event_df.select(start_time, hour, day, week, month, year, is_weekday.alias("weekday"))
# write time table to parquet files partitioned by year and month
time_table.write.mode(saveMode="overwrite").partitionBy("year", "month").parquet("D://UdacityProjectDataOutput//time_table//")
# read in song data to use for songplays table
input_path = "D://UdacityProjectDataOutput//"
song_df = spark.read.parquet(f'{input_path}//song_table//*//*//*.parquet')
artist_df = spark.read.parquet(f'{input_path}//artists_table//*.parquet').drop("location")
# extract columns from joined song and log datasets to create songplays table
songplays_DF = event_df.join(song_df, f.expr("song rlike title"))
songplays_table = songplays_DF.join(artist_df, f.expr("artist rlike name")).withColumn("year", f.year(
songplays_DF.ts_time)).withColumn("month", f.month(songplays_DF.ts_time))
songplays_table = songplays_table.selectExpr("monotonically_increasing_id() as songplay_id",
"ts_time as start_time", "userId as user_id", "level", "song_id",
"artist_id",
"sessionId as session_id", "location", "userAgent as user_agent",
"year", "month")
songplays_table.write.mode(saveMode="overwrite").partitionBy("year", "month").parquet("D://UdacityProjectDataOutput//songplays_table//")
def main():
spark = create_spark_session()
input_data = "s3a://udacity-dend/"
#input_data = "D://UdacityProjectData"
output_data = "s3a://spotifybucket2/"
process_song_data(spark, input_data, output_data)
process_log_data(spark, input_data, output_data)
if __name__ == "__main__":
main()
I am running the script by ssh into master node and running below command
/usr/bin/spark-submit --master yarn ./udacity_songs.py
However nothing happens.After few seconds below is what I see in the console
21/01/27 17:37:00 INFO TypeUtil: JVM Runtime does not support Modules
21/01/27 17:37:04 INFO YarnClientSchedulerBackend: Requesting to kill executor(s) 3
21/01/27 17:37:04 INFO YarnClientSchedulerBackend: Actual list of executor(s) to be killed is 3
21/01/27 17:37:04 INFO ExecutorAllocationManager: Executors 3 removed due to idle timeout.(new desired total will be 2)
21/01/27 17:37:06 INFO YarnClientSchedulerBackend: Requesting to kill executor(s) 1
21/01/27 17:37:06 INFO YarnClientSchedulerBackend: Actual list of executor(s) to be killed is 1
21/01/27 17:37:06 INFO ExecutorAllocationManager: Executors 1 removed due to idle timeout.(new desired total will be 1)
21/01/27 17:37:06 INFO YarnClientSchedulerBackend: Requesting to kill executor(s) 2
21/01/27 17:37:06 INFO YarnClientSchedulerBackend: Actual list of executor(s) to be killed is 2
21/01/27 17:37:06 INFO ExecutorAllocationManager: Executors 2 removed due to idle timeout.(new desired total will be 0)
21/01/27 17:37:07 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 1.
21/01/27 17:37:07 INFO DAGScheduler: Executor lost: 1 (epoch 0)
21/01/27 17:37:07 INFO BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster.
21/01/27 17:37:07 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(1, ip-XXX-XX-XX-XX.us-west-2.compute.internal, 43847, None)
21/01/27 17:37:07 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor
21/01/27 17:37:07 INFO YarnScheduler: Executor 1 on ip-XXX-XX-XX-XX.us-west-2.compute.internal killed by driver.
21/01/27 17:37:07 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 2.
21/01/27 17:37:07 INFO DAGScheduler: Executor lost: 2 (epoch 0)
21/01/27 17:37:07 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
21/01/27 17:37:07 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(2, ip-XXX-XX-XX-XX.us-west-2.compute.internal, 34215, None)
21/01/27 17:37:07 INFO BlockManagerMaster: Removed 2 successfully in removeExecutor
21/01/27 17:37:07 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 3.
21/01/27 17:37:07 INFO DAGScheduler: Executor lost: 3 (epoch 0)
21/01/27 17:37:07 INFO BlockManagerMasterEndpoint: Trying to remove executor 3 from BlockManagerMaster.
21/01/27 17:37:07 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(3, ip-XXX-XX-XX-XX.us-west-2.compute.internal, 45221, None)
21/01/27 17:37:07 INFO BlockManagerMaster: Removed 3 successfully in removeExecutor
21/01/27 17:37:07 INFO YarnScheduler: Executor 2 on ip-XXX-XX-XX-XX.us-west-2.compute.internal killed by driver.
21/01/27 17:37:07 INFO YarnScheduler: Executor 3 on ip-XXX-XX-XX-XX.us-west-2.compute.internal killed by driver.
And after ctrl + c below is what it shows
CTraceback (most recent call last):
File "/home/hadoop/./udacity_songs.py", line 161, in <module>
main()
File "/home/hadoop/./udacity_songs.py", line 156, in main
process_song_data(spark, input_data, output_data)
File "/home/hadoop/./udacity_songs.py", line 49, in process_song_d