Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
386 views
in Technique[技术] by (71.8m points)

filesystems - spark ssc.textFileStream is not streamining any files from directory

I am trying to execute below code using eclipse (with maven conf) with 2 worker and each have 2 core or also tried with spark-submit.

public class StreamingWorkCount implements Serializable {

    public static void main(String[] args) {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
        JavaStreamingContext jssc = new JavaStreamingContext(
                "spark://192.168.1.19:7077", "JavaWordCount",
                new Duration(1000));
        JavaDStream<String> trainingData = jssc.textFileStream(
                "/home/bdi-user/kaushal-drive/spark/data/training").cache();
        trainingData.foreach(new Function<JavaRDD<String>, Void>() {

            public Void call(JavaRDD<String> rdd) throws Exception {
                List<String> output = rdd.collect();
                System.out.println("Sentences Collected from files " + output);
                return null;
            }
        });

        trainingData.print();
        jssc.start();
        jssc.awaitTermination();
    }
}

And log of that code

15/01/22 21:57:13 INFO FileInputDStream: New files at time 1421944033000 ms:

15/01/22 21:57:13 INFO JobScheduler: Added jobs for time 1421944033000 ms
15/01/22 21:57:13 INFO JobScheduler: Starting job streaming job 1421944033000 ms.0 from job set of time 1421944033000 ms
15/01/22 21:57:13 INFO SparkContext: Starting job: foreach at StreamingKMean.java:33
15/01/22 21:57:13 INFO DAGScheduler: Job 3 finished: foreach at StreamingKMean.java:33, took 0.000094 s
Sentences Collected from files []
-------------------------------------------
15/01/22 21:57:13 INFO JobScheduler: Finished job streaming job 1421944033000 ms.0 from job set of time 1421944033000 ms
Time: 1421944033000 ms
-------------------------------------------15/01/22 21:57:13 INFO JobScheduler: Starting job streaming job 1421944033000 ms.1 from job set of time 1421944033000 ms


15/01/22 21:57:13 INFO JobScheduler: Finished job streaming job 1421944033000 ms.1 from job set of time 1421944033000 ms
15/01/22 21:57:13 INFO JobScheduler: Total delay: 0.028 s for time 1421944033000 ms (execution: 0.013 s)
15/01/22 21:57:13 INFO MappedRDD: Removing RDD 5 from persistence list
15/01/22 21:57:13 INFO BlockManager: Removing RDD 5
15/01/22 21:57:13 INFO FileInputDStream: Cleared 0 old files that were older than 1421943973000 ms: 
15/01/22 21:57:13 INFO FileInputDStream: Cleared 0 old files that were older than 1421943973000 ms: 
15/01/22 21:57:13 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()

The Problem is that, i am not getting data form the file which is in the directory. Please help me.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Try it with another directory and then copy these files to that directory, while the job is running.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...