目錄flinkRichFunction之坑NO原因解決方案flink中richfunction的一點小作用①傳遞參數②傳遞廣播變量總結flinkRichFunction之坑flink的RichMa...
flink RichFunction之坑
flink的RichMapFunction,RichSinkFunction等,并不能百分百做到每次只open一個數據庫連接。
在有些情況下他會一直創建然后銷毀,創建銷毀。
舉例: 重點在第三行的注釋
val value = jsenv.socketTextStream("192.168.1編程客棧3.11", 9090) val value2 = value.filter(x => { try { var a = 1 / 0 //此處若沒有異常處理,任務不會斷,但是會重復打開數據庫連接 } catch { case e: Exception => } isInter(x) }).map(fun = x => { x.toLong }) val value1 = value2.assignTimestampsAndwatermarks(new BoundedOutOfOrdernessTimestampExtractor[Long](Time.seconds(1)) { override def extractTimestamp(element: Long): Long = { println(element + "***************") element } }) try { var a = 1 / 0 } catch { case e: Exception => } value1.map(new mymap) env.execute("test") } def isInter(input: String): Boolean = { val matcher = Pattern.compile("^[0-9]+$").matcher(input) matcher.find() } } class myRichMapfun6() extends RichMapFunction[ListBuffer[String], Unit] { var conn: Connection = _ var pst: PreparedStatement = _ override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://xxxxxxx:3306/zzt?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true", "root", "bigdata@mysql") println(conn) pst = conn.prepareStatement("i編程客棧nsert into testa 編程客棧(str) values (?)") } override def close(): Unit = { conn.close() pst.close() } override def map(in: ListBuffer[String]): Unit = { pst.setString(1, in.head) pst.execute() } }
所以你是不是覺得那就價格異常處理不就得了?
NO
再看:
這個時候,如果傳進來line不是數字或者格式不對,就會觸發異常,然而此時就不會像上面那樣幫你解決問題,而是一遍遍創建對象銷毀對象,一條消息創建一個連接,我就問你慌不慌,
原因
據觀察是因為,輸入的數據有問題,直接導致
val value1 = value2.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Long](Time.seconds(1)) { override def extractTimestamp(element: Long): Long = { println(element + "***************") element } })
這個崩潰了,不走這行代碼了,沒有獲得eventime,然后估計。。。 剩下的我也沒詳細測。。。
解決方案
先fiiter過濾任何可能導致異常的臟數據確保數據都沒問題就可以了。
flink中richfunction的一點小作用
①傳遞參數
所有需要用戶定義的函數都可以轉換成richfunction,例如實現map operator中你需要實現一個內部類,并實現它的map方法:
data.map (new MapFunction<String, Integer>() { public Integer map(String value) { return Integer.parseInt(value); } });
然后我們可以將其轉換為RichMapFunction:
dataDgcYOJfdUP.map (new RichMapFunction<String, Integer>() {
public Integer map(String value) { return Integer.parseInt(value); }
});
當然,RichFuction除了提供原來MapFuction的方法之外,還提供open, close, getRuntimeContext 和setRuntimeContext方法,這些功能可用于參數化函數(傳遞參數),創建和完成本地狀態,訪問廣播變量以及訪問運行時信息以及有關迭代中的信息。
下面我們來看看RichFuction中傳遞參數的例子,以下代碼是測試RichFilterFuction的例子,基于DataSet而非DataStream。
由代碼可見,可以將Configuration中的limit參數的值傳遞進RichFuction里面,通過后面withParameters方法傳遞進去,最后的結果是
由此可見,我從configuration中獲取了limit的值,并設定了fliter的閾值是2,從而過濾了1,2。
②傳遞廣播變量
原理和上面差不多,下面我直接把代碼貼出來:
這是目前我學習到的RichFunction的用法,和大家分享一下。
總結
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持我們。
如果認為本文對您有所幫助請贊助本站