项目三 Flume 采集日志数据至 hdfs
简介
- Flume 是一个用于收集、聚合和传输大量日志数据的分布式系统。它通常与 Hadoop 生态系统中的 HDFS(Hadoop Distributed File System)结合使用,能够将数据存储到 HDFS 中。
- 通过以下配置,Flume 能够高效、实时地将日志数据从本地目录采集并存储到 HDFS 中,便于后续的数据分析和处理。
- Flume 的配置文件通常包含以下几个部分:sources(数据来源)、sinks(数据去向)、channels(数据通道)。
工作流启动
- 先在/opt/module/flume/conf/job目录下创建一个flume采集数据至hdfs的配置文件
# 切换到job目录
cd /opt/module/flume/conf/job
# 编辑配置文件
vim hdfs.conf
hdfsAgent.sources = hdfsSource
hdfsAgent.sinks = hdfsSinks
hdfsAgent.channels = hdfsChannel
hdfsAgent.sources.hdfsSource.type = spooldir
hdfsAgent.sources.hdfsSource.channels = hdfsChannel
hdfsAgent.sources.hdfsSource.spoolDir = /opt/module/flume/conf/data/hdfs
hdfsAgent.sources.hdfsSource.fileHeader = true
hdfsAgent.sinks.hdfsSinks.type = hdfs
hdfsAgent.sinks.hdfsSinks.hdfs.path = hdfs://master:8020/flume/events/%y-%m-%d/%H%M/%S
hdfsAgent.sinks.hdfsSinks.hdfs.filePrefix = events
hdfsAgent.sinks.hdfsSinks.hdfs.fileSuffix = log
hdfsAgent.sinks.hdfsSinks.hdfs.round = true
hdfsAgent.sinks.hdfsSinks.hdfs.roundValue = 10
hdfsAgent.sinks.hdfsSinks.hdfs.roundUnit = minute
hdfsAgent.sinks.hdfsSinks.hdfs.minBlockReplicas = 1
hdfsAgent.sinks.hdfsSinks.hdfs.rollInterval = 0
hdfsAgent.sinks.hdfsSinks.hdfs.rollSize = 134217728
hdfsAgent.sinks.hdfsSinks.hdfs.rollCount = 0
hdfsAgent.sinks.hdfsSinks.hdfs.idleTimeout = 60
hdfsAgent.sinks.hdfsSinks.hdfs.fileType = DataStream
hdfsAgent.sinks.hdfsSinks.hdfs.useLocalTimeStamp = true
hdfsAgent.channels.hdfsChannel.type = memory
hdfsAgent.channels.hdfsChannel.capacity = 1000
hdfsAgent.channels.hdfsChannel.transactionCapacity = 100
hdfsAgent.sources.hdfsSource.channels = hdfsChannel
hdfsAgent.sinks.hdfsSinks.channel = hdfsChannel
配置文件参数详解
- 数据源(Sources)
hdfsAgent.sources = hdfsSource
这里定义了一个名为 hdfsSource 的数据源。
hdfsAgent.sources.hdfsSource.type = spooldir
数据源的类型是 spooldir,意味着 Flume 将从一个指定的目录中监听并收集文件。
hdfsAgent.sources.hdfsSource.channels = hdfsChannel
该数据源将使用名为 hdfsChannel 的通道来传输数据。
hdfsAgent.sources.hdfsSource.spoolDir = /opt/module/flume/conf/data/hdfs
这是 Flume 监听的目录路径,它会查看这个目录中新增加的文件。
hdfsAgent.sources.hdfsSource.fileHeader = true
这表示 Flume 会在采集的文件中包含文件头信息,通常用于记录元数据。
- 数据去向(Sinks)
hdfsAgent.sinks = hdfsSinks
这里定义了一个名为 hdfsSinks 的数据去向。
hdfsAgent.sinks.hdfsSinks.type = hdfs
数据去向的类型是 HDFS,表示数据将被写入到 HDFS 中。
hdfsAgent.sinks.hdfsSinks.hdfs.path = hdfs://master:8020/flume/events/%y-%m-%d/%H%M/%S
这是数据存储在 HDFS 中的路径格式,包括日期和时间。Flume 会根据采集时间自动在这个路径中创建目录。
hdfsAgent.sinks.hdfsSinks.hdfs.filePrefix = events
输出文件的前缀是 “events”。
hdfsAgent.sinks.hdfsSinks.hdfs.fileSuffix = log
输出文件的后缀是 “.log”。
hdfsAgent.sinks.hdfsSinks.hdfs.round = true
表示启用滚动机制,根据时间将文件进行分割。
hdfsAgent.sinks.hdfsSinks.hdfs.roundValue = 10
每10分钟触发一次数据滚动。
hdfsAgent.sinks.hdfsSinks.hdfs.roundUnit = minute
这里指定了时间单位为分钟。
hdfsAgent.sinks.hdfsSinks.hdfs.minBlockReplicas = 1
最小副本数为 1,表示写入 HDFS 时会有一个数据副本。
hdfsAgent.sinks.hdfsSinks.hdfs.rollInterval = 0
设置为0表示按照时间不会强制滚动,主要通过 round 机制来滚动文件。
hdfsAgent.sinks.hdfsSinks.hdfs.rollSize = 134217728
文件大小达到 128MB 时会进行滚动。
hdfsAgent.sinks.hdfsSinks.hdfs.rollCount = 0
设置为0表示不限制记录数的滚动。
hdfsAgent.sinks.hdfsSinks.hdfs.idleTimeout = 60
如果没有数据流入,经过60秒后文件将被关闭。
hdfsAgent.sinks.hdfsSinks.hdfs.fileType = DataStream
此设置指定文件类型为数据流,意味着数据将以流的形式写入。
hdfsAgent.sinks.hdfsSinks.hdfs.useLocalTimeStamp = true
启用本地时间戳作为文件的时间戳。
- 数据通道(Channels)
hdfsAgent.channels.hdfsChannel.type = memory
通道类型为内存通道,意味着数据在内存中进行传输。
hdfsAgent.channels.hdfsChannel.capacity = 1000
内存通道能容纳最多1000条事件。
hdfsAgent.channels.hdfsChannel.transactionCapacity = 100
每次事务中最多处理100条事件。
创建相关目录
代码语言:shell复制# hdfs上创建/flume/events目录
hadoop fs -mkdir -p /flume/events
# 添加权限
hadoop fs -chmod 777 -R /flume/*
# 创建日志文件路径
mkdir -p /opt/module/flume/conf/data/hdfs
模拟日志生成脚本
- 这个脚本的作用是生成模拟的日志文件,并将其放入指定的目录中,以便于用作数据测试或进行数据采集
- 创建一个用于存放日志文件的目录。
- 生成 5 个模拟的日志文件,每个文件中都包含一条带有时间戳的日志记录。
- 每个日志文件的生成会有 1 秒的间隔,模拟实际的日志生成过程。
# 创建存放脚本的目录
mkdir -p /opt/module/flume/job-shell
# 切换到该目录下
/opt/module/flume/job-shell
# 编辑脚本
vim logData_To_Hdfs
#!/bin/bash
# 设置保存路径
SPOOL_DIR="/opt/module/flume/conf/data/hdfs"
# 创建 spoolDir 路径(如不存在)
mkdir -p "${SPOOL_DIR}"
# 生成 5 个随机日志文件
for i in {1..5}; do
LOG_FILE="${SPOOL_DIR}/logfile_${i}.log"
echo "INFO: This is a simulated log entry $(date '+%Y-%m-%d %H:%M:%S')" > "${LOG_FILE
echo "Generated ${LOG_FILE}"
sleep 1 # 模拟生成的间隔
done
# 添加权限
chmod 777 ./*
flume启动脚本
- 该脚本方便地启动 Flume 任务而不需要手动输入所有命令,也可以确保 Flume 进程在后台持续运行,适合在生产环境中使用。
cd /opt/module/flume/job-shell
vim hdfs
#!/bin/bash
echo " --------启动 master 采集日志数据至HDFS --------"
nohup /opt/module/flume/bin/flume-ng agent -n hdfsAgent -c /opt/module/flume/conf/ -f /opt/module/flume/conf/job/hdfs.conf >/dev/null 2>&1 &
# 添加权限
chmod 777 ./*
启动流程
代码语言:shell复制# 先启动Hadoop所有进程
allstart.sh
# 切换到脚本启动路径下
cd /opt/module/flume/job-shell
# 启动flume采集脚本
hdfs
# 启动日志文件生成脚本
logData_To_Hdfs
- 启动flume采集脚本
- 启动日志文件生成脚本
- 查看其中一个日志文件内容
检测结果
- 命令查看文件采集结果
hadoop fs -ls -R /flume
- 文件系统查看结果
发布者:admin,转转请注明出处:http://www.yc00.com/web/1755072219a5235295.html
评论列表(0条)