Apache Flink JDBC InputFormat throwing java.net.SocketException: Socket closed -


i querying oracle database using flink dataset api. have customised flink jdbcinputformat return java.sql.resultset. need perform further operation on resultset using flink operators.

public static void main(string[] args) throws exception {       executionenvironment environment = executionenvironment.getexecutionenvironment();     environment.setparallelism(1);     @suppresswarnings("unchecked")     datasource<resultset> source             = environment.createinput(jdbcinputformat.buildjdbcinputformat()                     .setusername("username")                     .setpassword("password")                     .setdrivername("driver_name")                     .setdburl("jdbcurl")                     .setquery("query")                     .finish(),                           new generictypeinfo<resultset>(resultset.class)             );     source.print();      environment.execute();  }  

following customised jdbcinputformat:

public class jdbcinputformat extends richinputformat<resultset, inputsplit> implements resulttypequeryable {  @override public void open(inputsplit inputsplit) throws ioexception {                 class.forname(drivername);                     dbconn = drivermanager.getconnection(dburl, username, password);                 statement = dbconn.preparestatement(querytemplate, resultsettype, resultsetconcurrency);                 resultset = statement.executequery(); }  @override public void close() throws ioexception {             if(statement != null) {                     statement.close();                 }                 if(resultset != null)                      resultset.close();                 if(dbconn != null) {                     dbconn.close();                 } }  @override public boolean reachedend() throws ioexception {         islastrecord = resultset.islast();     return islastrecord; }  @override public resultset nextrecord(resultset row) throws ioexception{           if(!islastrecord){                           resultset.next();         }         return resultset; } 

}

this works below query having limit in row fetched: select a,b,c xyz rownum <= 10; when try fetch rows having approx 1 million of data, getting below exception after fetching random number of rows:

java.sql.sqlrecoverableexception: io exception: socket closed @ oracle.jdbc.driver.sqlstatemapping.newsqlexception(sqlstatemapping.java:101) @ oracle.jdbc.driver.databaseerror.newsqlexception(databaseerror.java:133) @ oracle.jdbc.driver.databaseerror.throwsqlexception(databaseerror.java:199) @ oracle.jdbc.driver.databaseerror.throwsqlexception(databaseerror.java:263) @ oracle.jdbc.driver.databaseerror.throwsqlexception(databaseerror.java:521) @ oracle.jdbc.driver.t4cpreparedstatement.fetch(t4cpreparedstatement.java:1024) @ oracle.jdbc.driver.oracleresultsetimpl.close_or_fetch_from_next(oracleresultsetimpl.java:314) @ oracle.jdbc.driver.oracleresultsetimpl.next(oracleresultsetimpl.java:228) @ oracle.jdbc.driver.scrollableresultset.cacherowat(scrollableresultset.java:1839) @ oracle.jdbc.driver.scrollableresultset.isvalidrow(scrollableresultset.java:1823) @ oracle.jdbc.driver.scrollableresultset.islast(scrollableresultset.java:349) @ jdbcinputformat.reachedend(jdbcinputformat.java:98) @ org.apache.flink.runtime.operators.datasourcetask.invoke(datasourcetask.java:173) @ org.apache.flink.runtime.taskmanager.task.run(task.java:559) @ java.lang.thread.run(thread.java:745) 

caused by: java.net.socketexception: socket closed @ java.net.socketoutputstream.socketwrite0(native method)

so case, how can solve issue?

i don't think possible ship resultset regular record. stateful object internally maintains connection database server. using resultset record transferred between flink operators means can serialized, shipped on via network machine, deserialized, , handed different thread in different jvm process. not work.

depending on connection resultset might stay on same machine in same thread, might case worked you. if want query database within operator, implement function richmappartitionfunction. otherwise, i'd read resultset in data source , forward resulting rows.


Comments

Popular posts from this blog

magento2 - Magento 2 admin grid add filter to collection -

Android volley - avoid multiple requests of the same kind to the server? -

Combining PHP Registration and Login into one class with multiple functions in one PHP file -