编程知识 cdmana.com

Error:KafkaStorageException打开的文件过多

问题描述,在Flink集群大数据处理过程中,向Kafka进行生产数据和消费数据;如果Flink处理过程中出现异常,采取相应的重启机制或设置检查点策略;项目启动后,随着设备接入越来越多,kafka的topic动态产生的也越来越多,Flink处理开始出现异常

java.io.IOException: Could not perform checkpoint 87 for operator Sink: Unnamed (34/90)#99.
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1274)
	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
	at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
	at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 87 for operator Sink: Unnamed (34/90)#99. Failure reason: Checkpoint was declined.
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:265)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:233)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:206)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:186)
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:605)
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:315)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1329)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1315)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1258)
	... 22 more
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 27 record(s) for topic-call_XAucjhIN-0:120000 ms has passed since batch creation
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1429)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1117)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:1014)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:102)
	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:345)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:1122)
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:87)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:219)
	... 33 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 27 record(s) for topic-call_XAucjhIN-0:120000 ms has passed since batch creation

Kafka集群中某一台服务器挂掉,报错信息如下:

[2022-08-01 14:55:22,453] ERROR Error while writing to checkpoint file /home/kafka-logs/fan_sink_29-1/leader-epoch-checkpoint (kafka.server.LogDirFailureChannel)
java.io.FileNotFoundException: /home/kafka-logs/topic_min/leader-epoch-checkpoint.tmp (打开的文件过多)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
	at kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:94)
	at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:92)
	at kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:70)
	at kafka.server.epoch.LeaderEpochFileCache.flush(LeaderEpochFileCache.scala:292)
	at kafka.server.epoch.LeaderEpochFileCache.assign(LeaderEpochFileCache.scala:61)
	at kafka.log.Log.$anonfun$maybeAssignEpochStartOffset$1(Log.scala:1368)
	at kafka.log.Log.$anonfun$maybeAssignEpochStartOffset$1$adapted(Log.scala:1367)
	at scala.Option.foreach(Option.scala:437)
	at kafka.log.Log.maybeAssignEpochStartOffset(Log.scala:1367)
	at kafka.cluster.Partition.$anonfun$makeLeader$1(Partition.scala:592)
	at kafka.cluster.Partition.makeLeader(Partition.scala:547)
	at kafka.server.ReplicaManager.$anonfun$makeLeaders$5(ReplicaManager.scala:1568)
	at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
	at scala.collection.mutable.HashMap$Node.foreachEntry(HashMap.scala:633)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:499)
	at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:1566)
	at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1411)
	at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:258)
	at kafka.server.KafkaApis.handle(KafkaApis.scala:171)
	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
	at java.lang.Thread.run(Thread.java:748)

处理方案如下:

//查找包含kafka的目录或文件【定位kafka.service】

[[email protected] ~]# cd /

[[email protected] ~]# find / -name *kafka*

/etc/systemd/system/kafka.service

[[email protected] ~]# cd /etc/systemd/system/
//修改配置-增加读取文件大小

[[email protected] ~]# vi kafka.service

#增加最大文件数
LimitNOFILE=65535

[[email protected] ~]# systemctl daemon-reload 

//重启kafka

[[email protected] ~]# systemctl stop kafka

[[email protected] ~]# systemctl start kafka

//查看kafka进程

[[email protected] system]# ps -ef|grep kafka
这里找到kafka进程号为19694

[[email protected] system]# cat /proc/19694/limits 
Limit                     Soft Limit           Hard Limit           Units     
Max cpu time              unlimited            unlimited            seconds   
Max file size             unlimited            unlimited            bytes     
Max data size             unlimited            unlimited            bytes     
Max stack size            8388608              unlimited            bytes     
Max core file size        0                    unlimited            bytes     
Max resident set          unlimited            unlimited            bytes     
Max processes             2062355              2062355              processes 
Max open files            65535                65535                files     
Max locked memory         65536                65536                bytes     
Max address space         unlimited            unlimited            bytes     
Max file locks            unlimited            unlimited            locks     
Max pending signals       2062355              2062355              signals   
Max msgqueue size         819200               819200               bytes     
Max nice priority         0                    0                    
Max realtime priority     0                    0                    
Max realtime timeout      unlimited            unlimited           

Max Open Files  已经变为65535

至此"打开文件过多"问题已处理完毕

版权声明
本文为[甘苦]所创,转载请带上原文链接,感谢
https://blog.csdn.net/learnworm/article/details/126106548

Scroll to Top