-
Notifications
You must be signed in to change notification settings - Fork 103
Flink配置
在向 YARN 提交 flink 任务时,需提供 $HADOOP_HOME
或 $HADOOP_CONF_DIR
环境变量,从而 flink-yarn 客户端可以获取到 YARN 和 HDFS 的连接地址。
通过下面的配置可以预先在机器上设置对应的环境变量,flink 脚本 bin/config.sh
和 HadoopUtils#getHadoopConfiguration(org.apache.flink.configuration.Configuration)
会通过 HADOOP_HOME
和 HADOOP_CONF_DIR
获取 hadoop 配置信息,向 YARN 提交任务时,flink 可以从 hadoop 配置信息中获取 HDFS 和 YARN 地址。
CoreOptions#FLINK_HADOOP_CONF_DIR
CoreOptions#FLINK_YARN_CONF_DIR
CoreOptions#FLINK_HBASE_CONF_DIR
但是这些配置的定义只是为了自动生成 flink 文档,hadoop 配置环境变量只会读取机器上的,而不会使用 CoreOptions#FLINK_HADOOP_CONF_DIR
配置值。
在 HadoopUtils#getHadoopConfiguration(org.apache.flink.configuration.Configuration)
方法实现中,是支持下面 3 个配置项的:
/**
* Path to hdfs-default.xml file.
*
* @deprecated Use environment variable HADOOP_CONF_DIR instead.
*/
@Deprecated public static final String HDFS_DEFAULT_CONFIG = "fs.hdfs.hdfsdefault";
/**
* Path to hdfs-site.xml file.
*
* @deprecated Use environment variable HADOOP_CONF_DIR instead.
*/
@Deprecated public static final String HDFS_SITE_CONFIG = "fs.hdfs.hdfssite";
/**
* Path to Hadoop configuration.
*
* @deprecated Use environment variable HADOOP_CONF_DIR instead.
*/
@Deprecated public static final String PATH_HADOOP_CONFIG = "fs.hdfs.hadoopconf";
因此可以通过上面 3 个配置项动态指定 hadoop 配置。
上面 3 个配置都已经标记为 @Deprecated
,如果未来版本中上述配置被移除,可以对 HadoopUtils#getHadoopConfiguration(org.apache.flink.configuration.Configuration)
方法进行修改,以支持动态获取 CoreOptions#FLINK_HADOOP_CONF_DIR
配置。
在向 Kubernetes 提交 flink 任务时,同样需要提供 Kubernetes 的连接信息。flink-kubernetes 客户端是默认读取 ${HOME}/.kube/config
文件,context 使用默认配置中的默认 context。
通过下面的配置就可以指定 kubeconfig 和 context,从而灵活地支持向多个 Kubernetes 集群提交任务。
KubernetesConfigOptions#KUBE_CONFIG_FILE
KubernetesConfigOptions#CONTEXT
有意思的是下面这个选项,可以让 flink 任务在 Kubernetes 中直接使用存储在 ConfigMap 中的 hadoop 配置文件。
KubernetesConfigOptions#HADOOP_CONF_CONFIG_MAP
为了不让系统运行在黑盒状态下,用户无法查看系统内部运行细节的尴尬场景下,可观测性的重要性越来越高。
flink 的 web ui 是可以直接查看 flink 集群节点的运行日志的,这种方式相比流行的 ELK 系统来说,就少了日志检索的功能,另外当集群运行在 YARN 或 Kubernetes 中,集群崩溃日志信息也会跟着丢失。
采集 flink 集群日志信息也成为 flink 基础设施运维的重要一环。
下面 2 个参数只支持对应的环境变量,定义只是为了说明。
CoreOptions#FLINK_LOG_DIR
CoreOptions#FLINK_LOG_MAX
而主要改变 flink 日志输出的手段还是修改位于 $FLINK_HOME/conf
的日志配置文件。
flink 使用 java 语言实现,运行在 JVM 上,调整 flink 组件在 JVM 上的运行参数是有必要的
CoreOptions#FLINK_JVM_OPTIONS
CoreOptions#FLINK_JM_JVM_OPTIONS
CoreOptions#FLINK_TM_JVM_OPTIONS
CoreOptions#FLINK_HS_JVM_OPTIONS
CoreOptions#FLINK_CLI_JVM_OPTIONS
合理分配 flink 在 JVM 上的内存使用可以提高工作负载,减少系统陷入 OOM 的风险。
详细信息参考 Set up Flink’s Process Memory
注意:内存参数的调整也会受到 jvm 参数的影响。
JobManager 高可用配置。flink 提供了 2 中实现:zookeepr 和 kubernetes
HighAvailabilityOptions#HA_MODE
HighAvailabilityOptions#HA_CLUSTER_ID
HighAvailabilityOptions#HA_STORAGE_PATH
文档链接:
flink 作为一个有状态的计算引擎,状态存储和轻量级异步快照算法正是实现 Exactly Once 语义的关键。
State Backend 有 2 种实现:HashMap 和 RocksDB。其中 RocksDB 支持增量 checkpoint。
- fixeddelay
- failurerate
- exponentialdelay
RestartStrategyOptions#RESTART_STRATEGY
RestartStrategyOptions#RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS
RestartStrategyOptions#RESTART_STRATEGY_FIXED_DELAY_DELAY
RestartStrategyOptions#RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL
RestartStrategyOptions#RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL
RestartStrategyOptions#RESTART_STRATEGY_FAILURE_RATE_DELAY
RestartStrategyOptions#RESTART_STRATEGY_EXPONENTIAL_DELAY_INITIAL_BACKOFF
RestartStrategyOptions#RESTART_STRATEGY_EXPONENTIAL_DELAY_MAX_BACKOFF
RestartStrategyOptions#RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER
RestartStrategyOptions#RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER
RestartStrategyOptions#RESTART_STRATEGY_EXPONENTIAL_DELAY_JITTER_FACTOR
Local, HDFS, S3, OSS
prometheus
文档链接:
Welcome to Scaleph wiki!