Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
479 views
in Technique[技术] by (71.8m points)

r - Transfer data from database to Spark using sparklyr

I have some data in a database, and I want to work with it in Spark, using sparklyr.

I can use a DBI-based package to import the data from the database into R

dbconn <- dbConnect(<some connection args>)
data_in_r <- dbReadTable(dbconn, "a table") 

then copy the data from R to Spark using

sconn <- spark_connect(<some connection args>)
data_ptr <- copy_to(sconn, data_in_r)

Copying twice is slow for big datasets.

How can I copy data directly from the database into Spark?

sparklyr has several spark_read_*() functions for import, but nothing database related. sdf_import() looks like a possibility, but it isn't clear how to use it in this context.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Sparklyr >= 0.6.0

You can use spark_read_jdbc.

Sparklyr < 0.6.0

I hope there is a more elegant solution out there but here is a minimal example using low level API:

  • Make sure that Spark has access to the required JDBC driver, for example by adding its coordinates to spark.jars.packages. For example with PostgreSQL (adjust for current version) you could add:

    spark.jars.packages org.postgresql:postgresql:9.4.1212
    

    to SPARK_HOME/conf/spark-defaults.conf

  • Load data and register as temporary view:

    name <- "foo"
    
    spark_session(sc) %>% 
      invoke("read") %>% 
      # JDBC URL and table name
      invoke("option", "url", "jdbc:postgresql://host/database") %>% 
      invoke("option", "dbtable", "table") %>% 
      # Add optional credentials
      invoke("option", "user", "scott") %>%
      invoke("option", "password", "tiger") %>% 
      # Driver class, here for PostgreSQL
      invoke("option", "driver", "org.postgresql.Driver") %>% 
      # Read and register as a temporary view
      invoke("format", "jdbc") %>% 
      invoke("load") %>% 
      # Spark 2.x, registerTempTable in 1.x
      invoke("createOrReplaceTempView", name)
    

    You can pass multiple options at once using an environment:

    invoke("options", as.environment(list(
      user="scott", password="tiger", url="jdbc:..."
    )))
    
  • Load temporary view with dplyr:

    dplyr::tbl(sc, name)
    
  • Be sure to read about further JDBC options, with focus on partitionColumn, *Bound and numPartitions.

  • For additional details see for example How to use JDBC source to write and read data in (Py)Spark? and How to improve performance for slow Spark jobs using DataFrame and JDBC connection?


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...