Stack Overflow Asked by Fahad Rana on September 6, 2020
We are trying to read large number of XML’s and run Xquery on them in pyspark for example books xml. We are using spark-xml-utils library.
reference answer: Calling scala code in pyspark for XSLT transformations
The definition of xquery processor where xquery is the string of xquery:
proc = sc._jvm.com.elsevier.spark_xml_utils.xquery.XQueryProcessor.getInstance(xquery)
We are reading the files in a directory using:
sc.wholeTextFiles("xmls/test_files")
This gives us an RDD containing all the files as a list of tuples:
[ (Filename1,FileContentAsAString), (Filename2,File2ContentAsAString) ]
The xquery evaluates and gives us results if we run on the string (FileContentAsAString)
whole_files = sc.wholeTextFiles("xmls/test_files").collect()
proc.evaluate(whole_files[1][1])
# Prints proper xquery result for that file
Problem:
If we try to run proc.evaluate() on the RDD using lambda function, it is failing.
test_file = sc.wholeTextFiles("xmls/test_files")
test_file.map(lambda x: proc.evaluate(x[1])).collect()
# Should give us a list of xquery results
Error:
PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
These functions work somehow but not the evaluate above:
Print the content xquery is applied on
test_file.map(lambda x: x[1]).collect()
# Outputs the content. if x[0], gives us the list of filenames
Return the len of characters in the contents
test_file.map(lambda x: len(x[1])).collect()
# Output: [15274, 13689, 13696]
Books example for reference:
books_xquery = """for $x in /bookstore/book
where $x/price>30
return $x/title/data()"""
proc_books = sc._jvm.com.elsevier.spark_xml_utils.xquery.XQueryProcessor.getInstance(books_xquery)
books_xml = sc.wholeTextFiles("xmls/books.xml")
books_xml.map(lambda x: proc_books.evaluate(x[1])).collect()
# Error
# I can share the stacktrace if you guys want
Unfortunately it is not possible to call a Java/Scala library directly within a map call from Python code. This answer gives a good explanation why there is no easy way to do this. In short the reason is that the Py4J gateway (which is necessary to "translate" the Python calls into the JVM world) only lives on the driver node while the map calls that you are trying to execute are running on the executor nodes.
One way around that problem would be to wrap the XQuery function in a Scala UDF (explained here), but it still would be necessary to write a few lines of Scala code.
EDIT: If you are able to switch from XQuery to XPath, a probably easier option is to change the (XPath) library. ElementTree is an XML libary written in Python and also XPath.
The code
xmls = spark.sparkContext.wholeTextFiles("xmls/test_files")
import xml.etree.ElementTree as ET
xpathquery = "...your query..."
xmls.flatMap(lambda x: ET.fromstring(x[1]).findall(xpathquery))
.map(lambda x: x.text)
.foreach(print)
would print all results of running the xpathquery
against all documents loaded from the directory xmls/test_files
.
At first a flatMap is used as the findall call returns a list of all matching elements within each document. By using flatMap
this list is flattened (the result might contain more than one element per file). In the second map
call the elements are mapped to their text in order to get a readable output.
Answered by werner on September 6, 2020
Get help from others!
Recent Answers
Recent Questions
© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP