Apache Flink JDBC InputFormat throwing java.net.SocketException: Socket closed -
i querying oracle database using flink dataset api. have customised flink jdbcinputformat return java.sql.resultset. need perform further operation on resultset using flink operators.
public static void main(string[] args) throws exception { executionenvironment environment = executionenvironment.getexecutionenvironment(); environment.setparallelism(1); @suppresswarnings("unchecked") datasource<resultset> source = environment.createinput(jdbcinputformat.buildjdbcinputformat() .setusername("username") .setpassword("password") .setdrivername("driver_name") .setdburl("jdbcurl") .setquery("query") .finish(), new generictypeinfo<resultset>(resultset.class) ); source.print(); environment.execute(); }
following customised jdbcinputformat:
public class jdbcinputformat extends richinputformat<resultset, inputsplit> implements resulttypequeryable { @override public void open(inputsplit inputsplit) throws ioexception { class.forname(drivername); dbconn = drivermanager.getconnection(dburl, username, password); statement = dbconn.preparestatement(querytemplate, resultsettype, resultsetconcurrency); resultset = statement.executequery(); } @override public void close() throws ioexception { if(statement != null) { statement.close(); } if(resultset != null) resultset.close(); if(dbconn != null) { dbconn.close(); } } @override public boolean reachedend() throws ioexception { islastrecord = resultset.islast(); return islastrecord; } @override public resultset nextrecord(resultset row) throws ioexception{ if(!islastrecord){ resultset.next(); } return resultset; }
}
this works below query having limit in row fetched: select a,b,c xyz rownum <= 10; when try fetch rows having approx 1 million of data, getting below exception after fetching random number of rows:
java.sql.sqlrecoverableexception: io exception: socket closed @ oracle.jdbc.driver.sqlstatemapping.newsqlexception(sqlstatemapping.java:101) @ oracle.jdbc.driver.databaseerror.newsqlexception(databaseerror.java:133) @ oracle.jdbc.driver.databaseerror.throwsqlexception(databaseerror.java:199) @ oracle.jdbc.driver.databaseerror.throwsqlexception(databaseerror.java:263) @ oracle.jdbc.driver.databaseerror.throwsqlexception(databaseerror.java:521) @ oracle.jdbc.driver.t4cpreparedstatement.fetch(t4cpreparedstatement.java:1024) @ oracle.jdbc.driver.oracleresultsetimpl.close_or_fetch_from_next(oracleresultsetimpl.java:314) @ oracle.jdbc.driver.oracleresultsetimpl.next(oracleresultsetimpl.java:228) @ oracle.jdbc.driver.scrollableresultset.cacherowat(scrollableresultset.java:1839) @ oracle.jdbc.driver.scrollableresultset.isvalidrow(scrollableresultset.java:1823) @ oracle.jdbc.driver.scrollableresultset.islast(scrollableresultset.java:349) @ jdbcinputformat.reachedend(jdbcinputformat.java:98) @ org.apache.flink.runtime.operators.datasourcetask.invoke(datasourcetask.java:173) @ org.apache.flink.runtime.taskmanager.task.run(task.java:559) @ java.lang.thread.run(thread.java:745)
caused by: java.net.socketexception: socket closed @ java.net.socketoutputstream.socketwrite0(native method)
so case, how can solve issue?
i don't think possible ship resultset
regular record. stateful object internally maintains connection database server. using resultset
record transferred between flink operators means can serialized, shipped on via network machine, deserialized, , handed different thread in different jvm process. not work.
depending on connection resultset
might stay on same machine in same thread, might case worked you. if want query database within operator, implement function richmappartitionfunction
. otherwise, i'd read resultset
in data source , forward resulting rows.
Comments
Post a Comment