Scala and Spark UDF function -
i made simple udf convert or extract values time field in temptabl in spark. register function when call function using sql throws nullpointerexception. below function , process of executing it. using zeppelin. strangly working yesterday stopped working morning.
function
def convert( time:string ) : string = { val sdf = new java.text.simpledateformat("hh:mm") val time1 = sdf.parse(time) return sdf.format(time1) }
register function
sqlcontext.udf.register("convert",convert _)
test function without sql -- works
convert(12:12:12) -> returns 12:12
test function sql in zeppelin fails.
%sql select convert(time) temptable limit 10
structure of temptable
root |-- date: string (nullable = true) |-- time: string (nullable = true) |-- serverip: string (nullable = true) |-- request: string (nullable = true) |-- resource: string (nullable = true) |-- protocol: integer (nullable = true) |-- sourceip: string (nullable = true)
part of stacktrace getting.
java.lang.nullpointerexception @ org.apache.hadoop.hive.ql.exec.functionregistry.getfunctioninfo(functionregistry.java:643) @ org.apache.hadoop.hive.ql.exec.functionregistry.getfunctioninfo(functionregistry.java:652) @ org.apache.spark.sql.hive.hivefunctionregistry.lookupfunction(hiveudfs.scala:54) @ org.apache.spark.sql.hive.hivecontext$$anon$3.org$apache$spark$sql$catalyst$analysis$overridefunctionregistry$$super$lookupfunction(hivecontext.scala:376) @ org.apache.spark.sql.catalyst.analysis.overridefunctionregistry$$anonfun$lookupfunction$2.apply(functionregistry.scala:44) @ org.apache.spark.sql.catalyst.analysis.overridefunctionregistry$$anonfun$lookupfunction$2.apply(functionregistry.scala:44) @ scala.option.getorelse(option.scala:120) @ org.apache.spark.sql.catalyst.analysis.overridefunctionregistry$class.lookupfunction(functionregistry.scala:44)
use udf instead of define function directly
import org.apache.spark.sql.functions._ val convert = udf[string, string](time => { val sdf = new java.text.simpledateformat("hh:mm") val time1 = sdf.parse(time) sdf.format(time1) } )
a udf's input parameter column(or columns). , return type column.
case class userdefinedfunction protected[sql] ( f: anyref, datatype: datatype, inputtypes: option[seq[datatype]]) { def apply(exprs: column*): column = { column(scalaudf(f, datatype, exprs.map(_.expr), inputtypes.getorelse(nil))) } }
Comments
Post a Comment