Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
649 views
in Technique[技术] by (71.8m points)

apache spark - "Container killed by YARN for exceeding memory limits. 10.4 GB of 10.4 GB physical memory used" on an EMR cluster with 75GB of memory

I'm running a 5 node Spark cluster on AWS EMR each sized m3.xlarge (1 master 4 slaves). I successfully ran through a 146Mb bzip2 compressed CSV file and ended up with a perfectly aggregated result.

Now I'm trying to process a ~5GB bzip2 CSV file on this cluster but I'm receiving this error:

16/11/23 17:29:53 WARN TaskSetManager: Lost task 49.2 in stage 6.0 (TID xxx, xxx.xxx.xxx.compute.internal): ExecutorLostFailure (executor 16 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 10.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

I'm confused as to why I'm getting a ~10.5GB memory limit on a ~75GB cluster (15GB per 3m.xlarge instance)...

Here is my EMR config:

[
 {
  "classification":"spark-env",
  "properties":{

  },
  "configurations":[
     {
        "classification":"export",
        "properties":{
           "PYSPARK_PYTHON":"python34"
        },
        "configurations":[

        ]
     }
  ]
},
{
  "classification":"spark",
  "properties":{
     "maximizeResourceAllocation":"true"
  },
  "configurations":[

  ]
 }
]

From what I've read, setting the maximizeResourceAllocation property should tell EMR to configure Spark to fully utilize all resources available on the cluster. Ie, I should have ~75GB of memory available... So why am I getting a ~10.5GB memory limit error? Here is the code I'm running:

def sessionize(raw_data, timeout):
# https://www.dataiku.com/learn/guide/code/reshaping_data/sessionization.html
    window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
              .orderBy("timestamp"))
    diff = (pyspark.sql.functions.lag(raw_data.timestamp, 1)
            .over(window))
    time_diff = (raw_data.withColumn("time_diff", raw_data.timestamp - diff)
                 .withColumn("new_session", pyspark.sql.functions.when(pyspark.sql.functions.col("time_diff") >= timeout.seconds, 1).otherwise(0)))
    window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
              .orderBy("timestamp")
              .rowsBetween(-1, 0))
    sessions = (time_diff.withColumn("session_id", pyspark.sql.functions.concat_ws("_", "user_id", "site_id", pyspark.sql.functions.sum("new_session").over(window))))
    return sessions
def aggregate_sessions(sessions):
    median = pyspark.sql.functions.udf(lambda x: statistics.median(x))
    aggregated = sessions.groupBy(pyspark.sql.functions.col("session_id")).agg(
        pyspark.sql.functions.first("site_id").alias("site_id"),
        pyspark.sql.functions.first("user_id").alias("user_id"),
        pyspark.sql.functions.count("id").alias("hits"),
        pyspark.sql.functions.min("timestamp").alias("start"),
        pyspark.sql.functions.max("timestamp").alias("finish"),
        median(pyspark.sql.functions.collect_list("foo")).alias("foo"),
    )
    return aggregated
 spark_context = pyspark.SparkContext(appName="process-raw-data")
spark_session = pyspark.sql.SparkSession(spark_context)
raw_data = spark_session.read.csv(sys.argv[1],
                                  header=True,
                                  inferSchema=True)
# Windowing doesn't seem to play nicely with TimestampTypes.
#
# Should be able to do this within the ``spark.read.csv`` call, I'd
# think. Need to look into it.
convert_to_unix = pyspark.sql.functions.udf(lambda s: arrow.get(s).timestamp)
raw_data = raw_data.withColumn("timestamp",
                               convert_to_unix(pyspark.sql.functions.col("timestamp")))
sessions = sessionize(raw_data, SESSION_TIMEOUT)
aggregated = aggregate_sessions(sessions)
aggregated.foreach(save_session)

Basically, nothing more than windowing and a groupBy to aggregate the data.

It starts with a few of those errors, and towards halting increases in the amount of the same error.

I've tried running spark-submit with --conf spark.yarn.executor.memoryOverhead but that doesn't seem to solve the problem either.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I feel your pain..

We had similar issues of running out of memory with Spark on YARN. We have five 64GB, 16 core VMs and regardless of what we set spark.yarn.executor.memoryOverhead to, we just couldn't get enough memory for these tasks -- they would eventually die no matter how much memory we would give them. And this as a relatively straight-forward Spark application that was causing this to happen.

We figured out that the physical memory usage was quite low on the VMs but the virtual memory usage was extremely high (despite the logs complaining about physical memory). We set yarn.nodemanager.vmem-check-enabled in yarn-site.xml to false and our containers were no longer killed, and the application appeared to work as expected.

Doing more research, I found the answer to why this happens here: http://web.archive.org/web/20190806000138/https://mapr.com/blog/best-practices-yarn-resource-management/

Since on Centos/RHEL 6 there are aggressive allocation of virtual memory due to OS behavior, you should disable virtual memory checker or increase yarn.nodemanager.vmem-pmem-ratio to a relatively larger value.

That page had a link to a very useful page from IBM: https://web.archive.org/web/20170703001345/https://www.ibm.com/developerworks/community/blogs/kevgrig/entry/linux_glibc_2_10_rhel_6_malloc_may_show_excessive_virtual_memory_usage?lang=en

In summary, glibc > 2.10 changed its memory allocation. And although huge amounts of virtual memory being allocated isn't the end of the world, it doesn't work with the default settings of YARN.

Instead of setting yarn.nodemanager.vmem-check-enabled to false, you could also play with setting the MALLOC_ARENA_MAX environment variable to a low number in hadoop-env.sh. This bug report has helpful information about that: https://issues.apache.org/jira/browse/HADOOP-7154

I recommend reading through both pages -- the information is very handy.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...