Skip to content

Flink配置

kalencaya edited this page Sep 16, 2022 · 10 revisions

Flink配置

hadoop

在向 YARN 提交 flink 任务时,需提供 $HADOOP_HOME$HADOOP_CONF_DIR 环境变量,从而 flink-yarn 客户端可以获取到 YARN 和 HDFS 的连接地址。

通过下面的配置可以预先在机器上设置对应的环境变量,flink 脚本 bin/config.shHadoopUtils#getHadoopConfiguration(org.apache.flink.configuration.Configuration) 会通过 HADOOP_HOMEHADOOP_CONF_DIR 获取 hadoop 配置信息,向 YARN 提交任务时,flink 可以从 hadoop 配置信息中获取 HDFS 和 YARN 地址。

CoreOptions#FLINK_HADOOP_CONF_DIR
CoreOptions#FLINK_YARN_CONF_DIR
CoreOptions#FLINK_HBASE_CONF_DIR

但是这些配置的定义只是为了自动生成 flink 文档,hadoop 配置环境变量只会读取机器上的,而不会使用 CoreOptions#FLINK_HADOOP_CONF_DIR 配置值。

HadoopUtils#getHadoopConfiguration(org.apache.flink.configuration.Configuration) 方法实现中,是支持下面 3 个配置项的:

/**
 * Path to hdfs-default.xml file.
 *
 * @deprecated Use environment variable HADOOP_CONF_DIR instead.
 */
@Deprecated public static final String HDFS_DEFAULT_CONFIG = "fs.hdfs.hdfsdefault";

/**
 * Path to hdfs-site.xml file.
 *
 * @deprecated Use environment variable HADOOP_CONF_DIR instead.
 */
@Deprecated public static final String HDFS_SITE_CONFIG = "fs.hdfs.hdfssite";

/**
 * Path to Hadoop configuration.
 *
 * @deprecated Use environment variable HADOOP_CONF_DIR instead.
 */
@Deprecated public static final String PATH_HADOOP_CONFIG = "fs.hdfs.hadoopconf";

因此可以通过上面 3 个配置项动态指定 hadoop 配置。

上面 3 个配置都已经标记为 @Deprecated,如果未来版本中上述配置被移除,可以对 HadoopUtils#getHadoopConfiguration(org.apache.flink.configuration.Configuration) 方法进行修改,以支持动态获取 CoreOptions#FLINK_HADOOP_CONF_DIR 配置。

kubernetes

在向 Kubernetes 提交 flink 任务时,同样需要提供 Kubernetes 的连接信息。flink-kubernetes 客户端是默认读取 ${HOME}/.kube/config 文件,context 使用默认配置中的默认 context。

通过下面的配置就可以指定 kubeconfig 和 context,从而灵活地支持向多个 Kubernetes 集群提交任务。

KubernetesConfigOptions#KUBE_CONFIG_FILE
KubernetesConfigOptions#CONTEXT

有意思的是下面这个选项,可以让 flink 任务在 Kubernetes 中直接使用存储在 ConfigMap 中的 hadoop 配置文件。

KubernetesConfigOptions#HADOOP_CONF_CONFIG_MAP

log

为了不让系统运行在黑盒状态下,用户无法查看系统内部运行细节的尴尬场景下,可观测性的重要性越来越高。

flink 的 web ui 是可以直接查看 flink 集群节点的运行日志的,这种方式相比流行的 ELK 系统来说,就少了日志检索的功能,另外当集群运行在 YARN 或 Kubernetes 中,集群崩溃日志信息也会跟着丢失。

采集 flink 集群日志信息也成为 flink 基础设施运维的重要一环。

下面 2 个参数只支持对应的环境变量,定义只是为了说明。

CoreOptions#FLINK_LOG_DIR
CoreOptions#FLINK_LOG_MAX

而主要改变 flink 日志输出的手段还是修改位于 $FLINK_HOME/conf 的日志配置文件。

jvm 参数

flink 使用 java 语言实现,运行在 JVM 上,调整 flink 组件在 JVM 上的运行参数是有必要的

CoreOptions#FLINK_JVM_OPTIONS
CoreOptions#FLINK_JM_JVM_OPTIONS
CoreOptions#FLINK_TM_JVM_OPTIONS
CoreOptions#FLINK_HS_JVM_OPTIONS
CoreOptions#FLINK_CLI_JVM_OPTIONS

内存参数

合理分配 flink 在 JVM 上的内存使用可以提高工作负载,减少系统陷入 OOM 的风险。

详细信息参考 Set up Flink’s Process Memory

注意:内存参数的调整也会受到 jvm 参数的影响。

HA 参数

JobManager 高可用配置。flink 提供了 2 中实现:zookeepr 和 kubernetes

HighAvailabilityOptions#HA_MODE
HighAvailabilityOptions#HA_CLUSTER_ID
HighAvailabilityOptions#HA_STORAGE_PATH

文档链接:

State & Fault Tolerance

flink 作为一个有状态的计算引擎,状态存储和轻量级异步快照算法正是实现 Exactly Once 语义的关键。

state backends

State Backend 有 2 种实现:HashMap 和 RocksDB。其中 RocksDB 支持增量 checkpoint。

checkpoint

savepoints

restart

  • fixeddelay
  • failurerate
  • exponentialdelay
RestartStrategyOptions#RESTART_STRATEGY

RestartStrategyOptions#RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS
RestartStrategyOptions#RESTART_STRATEGY_FIXED_DELAY_DELAY

RestartStrategyOptions#RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL
RestartStrategyOptions#RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL
RestartStrategyOptions#RESTART_STRATEGY_FAILURE_RATE_DELAY

RestartStrategyOptions#RESTART_STRATEGY_EXPONENTIAL_DELAY_INITIAL_BACKOFF
RestartStrategyOptions#RESTART_STRATEGY_EXPONENTIAL_DELAY_MAX_BACKOFF
RestartStrategyOptions#RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER
RestartStrategyOptions#RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER
RestartStrategyOptions#RESTART_STRATEGY_EXPONENTIAL_DELAY_JITTER_FACTOR

Resource Provider

YARN

Kubernetes

plugins

FileSystem

Local, HDFS, S3, OSS

metrics

prometheus

文档链接: