Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
290 views
in Technique[技术] by (71.8m points)

java - spark broadcast variable Map giving null value

I am using java8 with spark v2.4.1.

I am trying to use Broadcast variable Map for look up using as show below:

Input data:

+-----+-----+-----+
|code1|code2|code3|
+-----+-----+-----+
|1    |7    |  5  |
|2    |7    |  4  |
|3    |7    |  3  |
|4    |7    |  2  |
|5    |7    |  1  |
+-----+-----+-----+

Expected Output:

+-----+-----+-----+
|code1|code2|code3|
+-----+-----+-----+
|1    |7    |51   |
|2    |7    |41   |
|3    |7    |31   |
|4    |7    |21   |
|5    |7    |11   |
+-----+-----+-----+

My current code with different solutions that I have tried:

Map<Integer,Integer> lookup_map= new HashMap<>();
lookup_map.put(1,11);
lookup_map.put(2,21);
lookup_map.put(3,31);
lookup_map.put(4,41);
lookup_map.put(5,51);

JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
Broadcast<Map<Integer,Integer>> lookup_mapBcVar = javaSparkContext.broadcast(lookup_map);

Dataset<Row> resultDs= dataDs
  .withColumn("floor_code3", floor(col("code3")))
  .withColumn("floor_code3_int", floor(col("code3")).cast(DataTypes.IntegerType))
  .withColumn("map_code3", lit(((Map<Integer, Integer>)lookup_mapBcVar.getValue()).get(col("floor_code3_int"))))
  .withColumn("five", lit(((Map<Integer, Integer>)lookup_mapBcVar.getValue()).get(5)))
  .withColumn("five_lit", lit(((Map<Integer, Integer>)lookup_mapBcVar.getValue()).get(lit(5).cast(DataTypes.IntegerType))));

The output of the current code using:

resultDs.printSchema();                       
resultDs.show();
            
root
 |-- code1: integer (nullable = true)
 |-- code2: integer (nullable = true)
 |-- code3: double (nullable = true)
 |-- floor_code3: long (nullable = true)
 |-- floor_code3_int: integer (nullable = true)
 |-- map_code3: null (nullable = true)
 |-- five: integer (nullable = false)
 |-- five_lit: null (nullable = true)

+-----+-----+-----+-----------+---------------+---------+----+--------+
|code1|code2|code3|floor_code3|floor_code3_int|map_code3|five|five_lit|
+-----+-----+-----+-----------+---------------+---------+----+--------+
|    1|    7|  5.0|          5|              5|     null|  51|    null|
|    2|    7|  4.0|          4|              4|     null|  51|    null|
|    3|    7|  3.0|          3|              3|     null|  51|    null|
|    4|    7|  2.0|          2|              2|     null|  51|    null|
|    5|    7|  1.0|          1|              1|     null|  51|    null|
+-----+-----+-----+-----------+---------------+---------+----+--------+

To recreate the input data:

List<String[]> stringAsList = new ArrayList<>();
stringAsList.add(new String[] { "1","7","5" });
stringAsList.add(new String[] { "2","7","4" });
stringAsList.add(new String[] { "3","7","3" });
stringAsList.add(new String[] { "4","7","2" });
stringAsList.add(new String[] { "5","7","1" });
    
JavaSparkContext sparkContext = new JavaSparkContext(sparkSession.sparkContext());
JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map((String[] row) -> RowFactory.create(row));

   
StructType schema = DataTypes
  .createStructType(new StructField[] {
    DataTypes.createStructField("code1", DataTypes.StringType, false),
    DataTypes.createStructField("code2", DataTypes.StringType, false),
    DataTypes.createStructField("code3", DataTypes.StringType, false)
  });

Dataset<Row> dataDf= sparkSession.sqlContext().createDataFrame(rowRDD, schema).toDF();

    
Dataset<Row> dataDs = dataDf
  .withColumn("code1", col("code1").cast(DataTypes.IntegerType))
  .withColumn("code2", col("code2").cast(DataTypes.IntegerType))
  .withColumn("code3", col("code3").cast(DataTypes.IntegerType));

What am I doing wrong here?

Scala Notebook for the same here

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1165111237342523/3062033079132966/7035720262824085/latest.html

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

lit() return Column type, but map.get require the int type you can do in this way

    val df: DataFrame = spark.sparkContext.parallelize(Range(0, 10000), 4).toDF("sentiment")
    val map = new util.HashMap[Int, Int]()
    map.put(1, 1)
    map.put(2, 2)
    map.put(3, 3)
    val bf: Broadcast[util.HashMap[Int, Int]] = spark.sparkContext.broadcast(map)
    df.rdd.map(x => {
      val num = x.getInt(0)
      (num, bf.value.get(num))
    }).toDF("key", "add_key").show(false)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...