大数据Hadoop体系安装教程,收藏
节点信息
ip | host | 软件 | |
---|---|---|---|
192.168.10.200 | master | hadoop、spark、kafka、zookeeper、hive、mysql | 主节点 |
192.168.10.201 | slave1 | hadoop、spark、kafka、zookeeper | 从节点 |
192.168.10.202 | slave2 | hadoop、spark、kafka、zookeeper | 从节点 |
centos7 关键操作命令
ip addr
[root@master ~]# ip addr
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
valid_lft forever preferred_lft forever
inet6 ::1/128 scope host
valid_lft forever preferred_lft forever
2: ens32: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
link/ether 00:0c:29:2c:ce:34 brd ff:ff:ff:ff:ff:ff
inet 192.168.10.200/24 brd 192.168.10.255 scope global noprefixroute ens32
valid_lft forever preferred_lft forever
inet6 fe80::6f17:1a7:97bd:2adb/64 scope link noprefixroute
valid_lft forever preferred_lft forever
[root@master ~]#
[root@master ~]# vi /etc/sysconfig/network-scripts/ifcfg-ens32
注意: ens32 的获取方式为 ip addr
ip 配置内容如下:
重启网络服务
systemctl restart network
防火墙
##查看防火墙状态 firewall-cmd --state
[root@localhost ~]# firewall-cmd --state
running
[root@localhost ~]#
## 关闭防火墙 systemctl stop firewalld.service
[root@localhost ~]# systemctl stop firewalld.service
[root@localhost ~]#
[root@localhost ~]# firewall-cmd --state
not running
[root@localhost ~]#
安装 net-tools netstat 命令
yum install net-tools
CRT连接CentOS7太慢
修改文件 /etc/ssh/sshd_config 把 # UseDNS yes
改为 UseDNS no
然后执行systemctl restart sshd
重启sshd服务即可。
vi /etc/ssh/sshd_config
#PermitUserEnvironment no
#Compression delayed
#ClientAliveInterval 0
#ClientAliveCountMax 3
#ShowPatchLevel no
##UseDNS yes
UseDNS no ## 把 UseDNS 改为 no 即可
#PidFile /var/run/sshd.pid
#MaxStartups 10:30:100
#PermitTunnel no
#ChrootDirectory none
#VersionAddendum none
重启 sshd 服务
systemctl restart sshd
安装 mysql
下载地址 dev.mysql.com/downloads/m…
下载列表有很多,选择RPM Bundle即可,其它的都可以通过RPM Bundle解压得到
国内镜像下载地址:mirrors.ustc.edu.cn/mysql-ftp/D…
制作Mysql yum源
mkdir /opt/mysql/
tar –xvf mysql-5.7.26-1.el7.x86_64.rpm-bundle.tar -C opt/mysql/
## 确保 镜像是可用的
yum install createrepo
进入mysql安装包解压后的路径,执行createrepo命令。
cd /opt/mysql
createrepo ./
vi /etc/yum.repos.d/mysql.repo
[mysql]
name=mysql
baseurl=file:///opt/mysql/
gpgcheck=0
enabled=1
mysql安装完成后会在/etc/目录下生成my.cnf配置文件(如果没有,可从别处拷贝或新建),根据my.cnf配置文件,默认mysql数据库是放在/var/lib/mysql/安装目录下的。
vi /etc/my.cnf
[mysqld]
## 屏蔽面密码登录
skip-grant-tables
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid
根据这个安装路径进行初始化:
mysqld --user=mysql --datadir=/var/lib/mysql --initialize-insecure
systemctl restart mysqld
mysql8 修改密码:
use mysql;
## 将authentication_string置空。
update user set authentication_string='' where user='root';
## 在mysql8.0以下版本
## update user set password='123456' where user='root';
## ## 在mysql8.0以上版本
alter user 'root'@'localhost' identified by '123456';
flush privileges;
卸载mysql:
systemctl stop mysqld
systemctl status mysqld
yum list installed mysql*
yum remove mysql-server
rm -rf /var/lib/mysql
rm -rf /var/log/mysqld.log
rm -rf /var/run/mysqld/mysqld.pid
mysql 授权常用命令:
## 创建用户 zhangsan zhangsan 的登录密码是 123456
create user 'zhangsan'@'%' identified by '123456';
grant all privileges on *.* to 'zhangsan'@'%' with grant option;
flush privileges;
hive
hive-env.sh
HADOOP_HOME=/root/apps/hadoop-2.7.7
HIVE_HOME=/root/apps/apache-hive-1.2.1-bin
HIVE_CONF_DIR=/root/apps/apache-hive-1.2.1-bin/conf
hive-site.xml
touch hive-site.xml
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- ConnectionURL -->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost:3306/test?serverTimezone=UTC</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<!-- ConnectionDriverName -->
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.cj.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<!-- ConnectionUserName -->
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>Username to use against metastore database</description>
</property>
<!-- ConnectionPassword -->
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
<description>password to use against metastore database</description>
</property>
<!-- hive.metastore.warehouse.dir -->
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/root/hive/warehouse</value>
<description>location of default database for the warehouse</description>
</property>
</configuration>
初始化
{HIVE_HOME}/bin/schematool -dbType mysql -initSchema
Hadoop
5 个文件配置
-
hadoop-env.sh
这个文件配置 javahome
-
core-site.xml
<configuration> <!-- set the host of namenode residing --> <property> <name>fs.defaultFS</name> <value>hdfs://master:9000</value> </property> <!-- set the position of dfs directory --> <property> <name>hadoop.tmp.dir</name> <value>/home/hadoop/hdpdata</value> </property> </configuration>
-
hdfs-site.xml
-
mapred-site.xml.template
mv mapred-site.xml.template mapred-site.xml
<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
-
yarn-site.xml
<configuration> <!--指定yarn的老大(resourcemanager)的地址--> <property> <name>yarn.resourcemanager.hostname</name> <value>192.168.3.119</value> </property> <!--reducer 获取数据的方式--> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> </configuration>
-
slaves
slave1 slave2 ...
格式化
hadoop namenode -format
启动 HDFS
## 在master 执行
hadoop-daemon.sh start namenode
## 在slave1 执行
hadoop-daemon.sh start datanode
## 在slave2 执行
hadoop-daemon.sh start datanode
启动所有
start-all.sh
Spark
配置两个文件
修改文件名:进入spark的conf目录 把 spark-env.sh.template 改成 spark-env.sh
spark-env.sh
export JAVA_HOME=/root/apps/jdk1.8.0_77
export HADOOP_HOME=/root/apps/hadoop-2.7.7
export HADOOP_CONF_DIR=/root/apps/hadoop-2.7.7/etc/hadoop
export SPARK_MASTER_IP=master
export SPARK_MASTER_PORT=7077
export SPARK_EXECUTOR_MEMORY=512m
## 以下是Spark-2.2.3:以后多出来的配置
export SPARK_WORKER_INSTANCES=2
##整合hive的配置如果不整合就不配置
export HIVE_CONF_DIR=/root/apps/apache-hive-1.2.1-bin/conf
## 这一步没有起到作用 需要把mysql驱动拷贝到 spark目录的jars里面
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/root/apps/apache-hive-1.2.1-bin/lib/mysql-connector-java-5.1.6-bin.jar
slaves
slave1
slave2
整合hive
把 hive 的配置文件 hive-site.xml 复制一份到spark目录的conf目录
把mysql的驱动复制一份到 到spark目录的 jars
Flink
flink-conf.yaml
非 HA
jobmanager.rpc.address: master
HA
#指定高可用模式(必须)
high-availability: zookeeper
#JobManager元数据保存在文件系统storageDir中,只有指向此状态的指针存储在ZooKeeper中(必须)
high-availability.storageDir: hdfs://master:9000/flink/ha/
#ZooKeeper仲裁是ZooKeeper服务器的复制组,它提供分布式协调服务(必须)
high-availability.zookeeper.quorum: master:2181,slave1:2181,slave2:2181
masters
master:8081
master2:8081 ## HA 时使用
slaves/works
slave1
slave2
zoo.cfg
$FLINK_HOME/conf/zoo.cfg HA 时使用
server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888
启动集群
start-cluster.sh
参考地址
https://www.cnblogs.com/aibabel/p/10937110.html
kafka
修改 server.propertise
vi $KAFKA_HOME/config/server.propertise
broker.id=0
zookeeper.connect=host1:2181,host1:2181,host3:2181 ...
log.dirs=/tmp/kafka-logs
启动服务
## 三台全启动
$KAFKA_HOME/bin/kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &
启动producer
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic Hellokafka
启动 consumer
$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper node1:2181,node2:2181,node3:2181 --from-beginning --topic Hellokafka
创建topic
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --create --topic topicname1 --partitions 3 --replication-factor 2
partitions 为分区数 ,replication为副本数
查看topic 列表
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --list
查看指定topic信息
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --describe --topic Hellokafka
控制台向指定topic生产数据
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list linux1:9092 --topic topicname
flume
安装
flume-env.sh
vi $FLUME_HOME/conf/flume-env.sh
JAVA_HOME=你的java目录
Flume 启动遇到的坑说明:
如果启动一个agent 包如下错误,就需要指定 -c 指向配置文件的路径,否则找不到 logger。因为 log4j.properties 是在 conf 目录。找不到logger 就无法向控制台输出信息
采集方案
一、 source类型是netcat
安装: yum install telnet-server yum install telnet.*
查询xinetd的状态: [root@localhost ~]# service xinetd status xinetd (pid 2967) 正在运行...
启动 service xinetd start
连接: telnet ip 端口
退出telnet ctrl+] 然后在telnet 命令行输入 quit 就可以退出了
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux1
a1.sources.r1.port = 666
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
命令./flume-ng agent -n a1 -f ../conf/netcat.conf -Dflume.root.logger=INFO,console
./flume-ng agent -n a1 -c ../conf -f ../conf/netcat.conf -Dflume.root.logger=INFO,console
二、 source类型是spooldir
spooldir 其实就是采集一个文件夹类型
a1.sources = r1 ##给agent的source起名
a1.sinks = k1 ##给agent的sinks起名
a1.channels = c1 ##给agent的channels起名
a1.sources.r1.type = spooldir ##文件夹
a1.sources.r1.spoolDir = /root/flume ##要采集的目录
a1.sources.r1.fileHeader = true ##采集过的文件是否需要添加一个后缀
a1.sinks.k1.type = logger
a1.channels.c1.type = memory ##把缓存数据放到内存
a1.channels.c1.capacity = 1000 ##管道里面最多可以存放多少事件
a1.channels.c1.transactionCapacity = 100 ##每次对最接收多少事件
a1.sources.r1.channels = c1 ## 把source和channels连接上
a1.sinks.k1.channel = c1 ##把sinks和channel连接上
命令:…/bin/flume-ng agent -n a1 -f ../conf/spooldir.conf -Dflume.root.logger=INFO,console
三、 source 类型是avro
这个配置source是avro类型,是一个服务器,这个服务器
开启一个端口8088,目是接收数据的
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = linux1 ##当前这一台机器的ip
a1.sources.r1.port = 8088
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
命令:…/bin/flume-ng agent -n a1 -f ../conf/server.conf -Dflume.root.logger=INFO,console
四、 source是socket类型
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type =syslogtcp
a1.sources.r1.bind=linux1
a1.sources.r1.port=8080
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
五、 sink类型是avro
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux2 ##当前这一台机器的ip
a1.sources.r1.port = 666
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux1
a1.sinks.k1.port = 8088
a1.sinks.k1.batch-size = 2
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
./bin/flume-ng agent -n a1 -f ./conf/client.conf -Dflume.root.logger=INFO,console
六、 sink类型是两个avro
a1.sources = r1 r2
a1.sinks = k1 k2
a1.channels = c1 c2
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux2
a1.sources.r1.port = 666
a1.sources.r2.type = netcat
a1.sources.r2.bind = linux2
a1.sources.r2.port = 777
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux1
a1.sinks.k1.port = 8088
a1.sinks.k1.batch-size = 2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux1
a1.sinks.k2.port = 8088
a1.sinks.k2.batch-size = 2
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel = c2
七、 sink 类型是hdfs
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux1
a1.sources.r1.port = 666
a1.sinks.k1.type = hdfs #sink到hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
##filePrefix 默认值:FlumeData
##写入hdfs的文件名前缀,可以使用flume提供的日期及%{host}表达式。
a1.sinks.k1.hdfs.filePrefix = events-
##默认值:30
##hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒
##如果设置成0,则表示不根据时间来滚动文件
#注:滚动(roll)指的是,hdfs sink将临时文件重命名成最终目标文件,
#并新打开一个临时文件来写入数据;
a1.sinks.k1.hdfs.rollInterval = 30
##默认值:1024
##当临时文件达到该大小(单位:bytes)时,滚动成目标文件;
##如果设置成0,则表示不根据临时文件大小来滚动文件;
a1.sinks.k1.hdfs.rollSize = 0
##默认值:10
##当events数据达到该数量时候,将临时文件滚动成目标文件;
##如果设置成0,则表示不根据events数据来滚动文件;
a1.sinks.k1.hdfs.rollCount = 0
##batchSize 默认值:100
##每个批次刷新到HDFS上的events数量;
a1.sinks.k1.hdfs.batchSize = 1
##useLocalTimeStamp
##默认值:flase
##是否使用当地时间。
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
八、 sink类型是kafka类型的
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux1
a1.sources.r1.port = 666
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = Hellokafka
a1.sinks.k1.brokerList = linux1:9092,linux2:9092,linux3:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
九、 Source 是mysql
注意事项:
必须把flume-ng-sql-source-1.4.1.jar 和mysql-connector-java-5.1.6-bin.jar 复制到lib目录
a1.sources = r1
a1.sinks = k1
a1.channels = c1
###########sources#################
# r1
a1.sources.r1.type = org.keedio.flume.source.SQLSource
a1.sources.r1.hibernate.connection.url = jdbc:mysql://localhost:3306/test
a1.sources.r1.hibernate.connection.user = root
a1.sources.r1.hibernate.connection.password = 123456
a1.sources.r1.hibernate.connection.autocommit = true
a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.r1.run.query.delay=10000
a1.sources.r1.status.file.path = /root/data/flume/
a1.sources.r1.status.file.name = sqlSource.status
a1.sources.r1.start.from = 0
a1.sources.r1.custom.query = select id,userName from user where id > $@$ order by id asc
a1.sources.r1.batch.size = 1000
a1.sources.r1.max.rows = 1000
a1.sources.r1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.r1.hibernate.c3p0.min_size=1
a1.sources.r1.hibernate.c3p0.max_size=10
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
十、 Source是TAILDIR
这一种source可以支持断点续传 支持正则表达式 (新版本才支持)
CDN_FLOW_2020020120-.TXT
# Describe/configure the source
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /root/flume/.*CDN.*
a1.sources.r1.positionFile= /root/a.json
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = tname
a1.sinks.k1.kafka.bootstrap.servers = linux1:9092,linux2:9092,linux3:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
十一、 Sink 是sparkstreaming
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = master ##当前这一台机器的ip
a1.sources.r1.port = 666
# 描述和配置 sinks 组件:k1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
# 负责汇总数据的服务器的IP地址或主机名 也就是flume安装的节点主机名
a1.sinks.k1.hostname=master
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000 # 批处理大小
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
./bin/flume-ng agent -n a1 -f ./conf/client.conf -Dflume.root.logger=INFO,console
注意事项:
Flume 1.8 以前的版本需要把FLUME_HOME/lib/fscala-library-2.0.5.jar替换为spark安装包里的 scala-library-2.11.8.jar,Flume 1.9以上无需替换。
把如下依赖的jar 包下载下来导入到flume的lib目录。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume-sink_2.11</artifactId> <version>2.2.3</version> </dependency>
SparkStreaming 读取 Flume 案例:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>day1_scala</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.3</version>
</dependency>
<!--
这个jar包需要复制到flume的lib里,本 Scala 程序不适用
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.11</artifactId>
<version>2.2.3</version>
</dependency>
-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.2.3</version>
</dependency>
</dependencies>
</project>
scala 代码:
package a
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
object FlumeEventCount {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Milliseconds(3000))
val stream = FlumeUtils.createPollingStream(ssc,"master",8888)
stream.map(x => new String(x.event.getBody().array())).print()
ssc.start()
ssc.awaitTermination()
}
}
转载自:https://juejin.cn/post/7386504498477187099