Stack Overflow Asked by Guru on November 13, 2020
I am doing some POC with Spark and Spark streaming for my project. So all I am doing is reading a file name from Topic. Downloading the file from a "src/main/sresource" and executing the usual "WordCount" frequency application.
@KafkaListener(topics = Constants.ABCWordTopic, groupId = Constants.ABC_WORD_COMSUMER_GROUP_ID)
public void processTask(@Payload String fileResourcePath) {
log.info("ABC Receiving task from WordProducer filepath {} at time {}", fileResourcePath,
LocalDateTime.now());
// Spark job
/*
* JavaRDD wordRDD =
* sparkContext.parallelize(Arrays.asList(extractFile(fileResourcePath).split(" ")));
* log.info("ABC Map Contents : {}", wordRDD.countByValue().toString());
* wordRDD.coalesce(1,
* true).saveAsTextFile("ResultSparklog_"+ System.currentTimeMillis());
*/
// Spark Streaming job
JavaPairDStream wordPairStream = streamingContext
.textFileStream(extractFile(fileResourcePath))
.flatMap(line -> Arrays.asList(SPACE.split(line)).iterator())
.mapToPair(s -> new Tuple2(s, 1)).reduceByKey((i1, i2) -> i1 + i2);
wordPairStream.foreachRDD(wordRDD -> {
// javaFunctions(wordTempRDD).writerBuilder("vocabulary", "words", mapToRow(String.class))
// .saveToCassandra();
log.info("ABC Map Contents : {}", wordRDD.keys().countByValue().toString());
wordRDD.coalesce(1, true)
.saveAsTextFile("SparkStreamResultlog_" + System.currentTimeMillis());
});
streamingContext.start();
try {
streamingContext.awaitTerminationOrTimeout(-1);
} catch (InterruptedException e) {
log.error("Terminated streaming context {}", e);
}
}
log.info("ABC Map Contents : {}", wordRDD.keys().countByValue().toString());
gives '{}' as output.Adding his answer for other folks who may be stuck on this point. From the first glance it looks like it should work, however reading the documentation on Spark here is the conclusion.
The "streamingContext.textFileStream(..)" API does not read static content from any directory. Therefore, it is unable to read the files from the directory or rather process it. It is meant to read streaming data therefore data has to be added or updated in the monitoring directory. Therefore a quick hack from what I have read on the web is to move the files or update the files into the windows directory (Iam using windows 10) once the program execution has begun (i.e StreamingContext.start has executed).
Please note that I was NOT able to get it to execute even after I tried all those hacks, but given that this is not supposedly the right usecase for streaming (reading from a folder and processing can be easily achieved with Spark job which is what my code demonstrated) I have to leave it at that.
Answered by Guru on November 13, 2020
Get help from others!
Recent Answers
Recent Questions
© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP