Scala and Spark UDF function -


i made simple udf convert or extract values time field in temptabl in spark. register function when call function using sql throws nullpointerexception. below function , process of executing it. using zeppelin. strangly working yesterday stopped working morning.

function

def convert( time:string ) : string = {   val sdf = new java.text.simpledateformat("hh:mm")   val time1 = sdf.parse(time)   return sdf.format(time1) } 

register function

sqlcontext.udf.register("convert",convert _) 

test function without sql -- works

convert(12:12:12) -> returns 12:12 

test function sql in zeppelin fails.

%sql select convert(time) temptable limit 10 

structure of temptable

root  |-- date: string (nullable = true)  |-- time: string (nullable = true)  |-- serverip: string (nullable = true)  |-- request: string (nullable = true)  |-- resource: string (nullable = true)  |-- protocol: integer (nullable = true)  |-- sourceip: string (nullable = true) 

part of stacktrace getting.

java.lang.nullpointerexception     @ org.apache.hadoop.hive.ql.exec.functionregistry.getfunctioninfo(functionregistry.java:643)     @ org.apache.hadoop.hive.ql.exec.functionregistry.getfunctioninfo(functionregistry.java:652)     @ org.apache.spark.sql.hive.hivefunctionregistry.lookupfunction(hiveudfs.scala:54)     @ org.apache.spark.sql.hive.hivecontext$$anon$3.org$apache$spark$sql$catalyst$analysis$overridefunctionregistry$$super$lookupfunction(hivecontext.scala:376)     @ org.apache.spark.sql.catalyst.analysis.overridefunctionregistry$$anonfun$lookupfunction$2.apply(functionregistry.scala:44)     @ org.apache.spark.sql.catalyst.analysis.overridefunctionregistry$$anonfun$lookupfunction$2.apply(functionregistry.scala:44)     @ scala.option.getorelse(option.scala:120)     @ org.apache.spark.sql.catalyst.analysis.overridefunctionregistry$class.lookupfunction(functionregistry.scala:44) 

use udf instead of define function directly

import org.apache.spark.sql.functions._  val convert = udf[string, string](time => {         val sdf = new java.text.simpledateformat("hh:mm")         val time1 = sdf.parse(time)         sdf.format(time1)     } ) 

a udf's input parameter column(or columns). , return type column.

case class userdefinedfunction protected[sql] (     f: anyref,     datatype: datatype,     inputtypes: option[seq[datatype]]) {    def apply(exprs: column*): column = {     column(scalaudf(f, datatype, exprs.map(_.expr), inputtypes.getorelse(nil)))   } } 

Comments

Popular posts from this blog

magento2 - Magento 2 admin grid add filter to collection -

Android volley - avoid multiple requests of the same kind to the server? -

Combining PHP Registration and Login into one class with multiple functions in one PHP file -