likes
comments
collection
share

大数据Hadoop体系安装教程,收藏

作者站长头像
站长
· 阅读数 25

节点信息

iphost软件
192.168.10.200masterhadoop、spark、kafka、zookeeper、hive、mysql主节点
192.168.10.201slave1hadoop、spark、kafka、zookeeper从节点
192.168.10.202slave2hadoop、spark、kafka、zookeeper从节点

centos7 关键操作命令

ip addr

[root@master ~]# ip addr
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
    inet 127.0.0.1/8 scope host lo
       valid_lft forever preferred_lft forever
    inet6 ::1/128 scope host 
       valid_lft forever preferred_lft forever
2: ens32: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
    link/ether 00:0c:29:2c:ce:34 brd ff:ff:ff:ff:ff:ff
    inet 192.168.10.200/24 brd 192.168.10.255 scope global noprefixroute ens32
       valid_lft forever preferred_lft forever
    inet6 fe80::6f17:1a7:97bd:2adb/64 scope link noprefixroute 
       valid_lft forever preferred_lft forever
[root@master ~]# 
[root@master ~]# vi /etc/sysconfig/network-scripts/ifcfg-ens32 

注意: ens32 的获取方式为 ip addr

ip 配置内容如下:

重启网络服务

systemctl restart network

防火墙

##查看防火墙状态 firewall-cmd --state 
[root@localhost ~]# firewall-cmd --state 
running
[root@localhost ~]# 

## 关闭防火墙 systemctl stop firewalld.service 
[root@localhost ~]# systemctl stop firewalld.service 
[root@localhost ~]# 
[root@localhost ~]# firewall-cmd --state 
not running
[root@localhost ~]# 

安装 net-tools netstat 命令

yum install net-tools

CRT连接CentOS7太慢

修改文件 /etc/ssh/sshd_config 把 # UseDNS yes 改为 UseDNS no 然后执行systemctl restart sshd 重启sshd服务即可。

vi /etc/ssh/sshd_config
#PermitUserEnvironment no
#Compression delayed
#ClientAliveInterval 0
#ClientAliveCountMax 3
#ShowPatchLevel no
##UseDNS yes
UseDNS no  ## 把 UseDNS 改为 no 即可
#PidFile /var/run/sshd.pid
#MaxStartups 10:30:100
#PermitTunnel no
#ChrootDirectory none
#VersionAddendum none

重启 sshd 服务

systemctl restart sshd

安装 mysql

下载地址 dev.mysql.com/downloads/m…

大数据Hadoop体系安装教程,收藏

下载列表有很多,选择RPM Bundle即可,其它的都可以通过RPM Bundle解压得到

大数据Hadoop体系安装教程,收藏

国内镜像下载地址:mirrors.ustc.edu.cn/mysql-ftp/D…

大数据Hadoop体系安装教程,收藏

制作Mysql yum源

mkdir /opt/mysql/
tar –xvf  mysql-5.7.26-1.el7.x86_64.rpm-bundle.tar -C opt/mysql/

## 确保 镜像是可用的
yum install createrepo

进入mysql安装包解压后的路径,执行createrepo命令。

cd /opt/mysql
createrepo ./

vi /etc/yum.repos.d/mysql.repo
[mysql]
name=mysql
baseurl=file:///opt/mysql/
gpgcheck=0
enabled=1

mysql安装完成后会在/etc/目录下生成my.cnf配置文件(如果没有,可从别处拷贝或新建),根据my.cnf配置文件,默认mysql数据库是放在/var/lib/mysql/安装目录下的。

vi /etc/my.cnf

[mysqld]
## 屏蔽面密码登录
skip-grant-tables
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock

log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid

根据这个安装路径进行初始化:

mysqld --user=mysql --datadir=/var/lib/mysql --initialize-insecure

systemctl restart mysqld

mysql8 修改密码:

use mysql;
## 将authentication_string置空。
update user set authentication_string='' where user='root';
## 在mysql8.0以下版本
## update user set password='123456' where user='root';

## ## 在mysql8.0以上版本
alter user 'root'@'localhost' identified by '123456';
flush privileges;

卸载mysql:

systemctl stop mysqld
systemctl status mysqld
yum list installed mysql*
yum remove mysql-server
rm -rf /var/lib/mysql
rm -rf /var/log/mysqld.log
rm -rf /var/run/mysqld/mysqld.pid

mysql 授权常用命令:

## 创建用户 zhangsan zhangsan 的登录密码是 123456
create user 'zhangsan'@'%' identified by '123456';


grant all privileges on *.* to 'zhangsan'@'%' with grant option;
flush privileges;

hive

hive-env.sh

HADOOP_HOME=/root/apps/hadoop-2.7.7
HIVE_HOME=/root/apps/apache-hive-1.2.1-bin
HIVE_CONF_DIR=/root/apps/apache-hive-1.2.1-bin/conf

hive-site.xml

touch  hive-site.xml
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
  <!--  ConnectionURL  -->
  <property>
<name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://localhost:3306/test?serverTimezone=UTC</value>
    <description>JDBC connect string for a JDBC metastore</description>
  </property>

  <!-- ConnectionDriverName -->
  <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>com.mysql.cj.jdbc.Driver</value>
    <description>Driver class name for a JDBC metastore</description>
  </property>


   <!-- ConnectionUserName -->
  <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>root</value>
    <description>Username to use against metastore database</description>
  </property>


 <!-- ConnectionPassword -->
  <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>123456</value>
    <description>password to use against metastore database</description>
  </property>

  <!-- hive.metastore.warehouse.dir -->
  <property>
<name>hive.metastore.warehouse.dir</name>

    <value>/root/hive/warehouse</value>
    <description>location of default database for the warehouse</description>
  </property>
</configuration>

初始化

{HIVE_HOME}/bin/schematool -dbType mysql -initSchema

Hadoop

5 个文件配置

  1. hadoop-env.sh

    这个文件配置 javahome

  2. core-site.xml

    <configuration>  
          
        <!-- set the host of namenode residing -->     
        <property>  
            <name>fs.defaultFS</name>  
            <value>hdfs://master:9000</value>  
        </property>      
        <!-- set the position of dfs directory  -->  
        <property>  
            <name>hadoop.tmp.dir</name>  
            <value>/home/hadoop/hdpdata</value>   
        </property>  
    </configuration> 
    
  3. hdfs-site.xml

  4. mapred-site.xml.template

    mv mapred-site.xml.template mapred-site.xml

    <configuration>  
        <property>  
            <name>mapreduce.framework.name</name>  
            <value>yarn</value>  
        </property>  
    </configuration>  
    
  5. yarn-site.xml

    <configuration>  
         <!--指定yarn的老大(resourcemanager)的地址-->
        <property> 
            <name>yarn.resourcemanager.hostname</name>
            <value>192.168.3.119</value>
        </property>
        <!--reducer 获取数据的方式-->
        <property>  
            <name>yarn.nodemanager.aux-services</name>  
            <value>mapreduce_shuffle</value>  
        </property>  
    </configuration>  
    
  6. slaves

    slave1
    slave2
    ...
    

格式化

hadoop namenode -format

启动 HDFS

## 在master 执行
hadoop-daemon.sh start namenode
## 在slave1 执行
hadoop-daemon.sh start datanode
## 在slave2 执行
hadoop-daemon.sh start datanode

启动所有

start-all.sh

Spark

配置两个文件

修改文件名:进入spark的conf目录 把 spark-env.sh.template 改成 spark-env.sh

spark-env.sh

export JAVA_HOME=/root/apps/jdk1.8.0_77
export HADOOP_HOME=/root/apps/hadoop-2.7.7
export HADOOP_CONF_DIR=/root/apps/hadoop-2.7.7/etc/hadoop
export SPARK_MASTER_IP=master
export SPARK_MASTER_PORT=7077
export SPARK_EXECUTOR_MEMORY=512m
## 以下是Spark-2.2.3:以后多出来的配置
export SPARK_WORKER_INSTANCES=2

##整合hive的配置如果不整合就不配置
export HIVE_CONF_DIR=/root/apps/apache-hive-1.2.1-bin/conf
## 这一步没有起到作用 需要把mysql驱动拷贝到 spark目录的jars里面
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/root/apps/apache-hive-1.2.1-bin/lib/mysql-connector-java-5.1.6-bin.jar

slaves

slave1
slave2

整合hive

把 hive 的配置文件 hive-site.xml 复制一份到spark目录的conf目录

把mysql的驱动复制一份到 到spark目录的 jars

Flink

flink-conf.yaml

非 HA

jobmanager.rpc.address: master

HA

#指定高可用模式(必须)
high-availability: zookeeper

#JobManager元数据保存在文件系统storageDir中,只有指向此状态的指针存储在ZooKeeper中(必须)
high-availability.storageDir: hdfs://master:9000/flink/ha/

#ZooKeeper仲裁是ZooKeeper服务器的复制组,它提供分布式协调服务(必须)
high-availability.zookeeper.quorum: master:2181,slave1:2181,slave2:2181

masters

master:8081
master2:8081 ## HA 时使用

slaves/works

slave1
slave2

zoo.cfg

$FLINK_HOME/conf/zoo.cfg HA 时使用

server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888

启动集群

start-cluster.sh 

参考地址

https://www.cnblogs.com/aibabel/p/10937110.html

kafka

修改 server.propertise

vi $KAFKA_HOME/config/server.propertise
broker.id=0
zookeeper.connect=host1:2181,host1:2181,host3:2181 ...
log.dirs=/tmp/kafka-logs

启动服务

## 三台全启动
$KAFKA_HOME/bin/kafka-server-start.sh ../config/server.properties  1>/dev/null  2>&1  &

启动producer

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic Hellokafka

启动 consumer

$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper node1:2181,node2:2181,node3:2181 --from-beginning --topic Hellokafka

创建topic

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --create --topic topicname1 --partitions 3  --replication-factor 2

partitions 为分区数 ,replication为副本数

查看topic 列表

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --list

查看指定topic信息

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --describe --topic Hellokafka

控制台向指定topic生产数据

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list linux1:9092 --topic topicname

flume

安装

flume-env.sh

vi $FLUME_HOME/conf/flume-env.sh
JAVA_HOME=你的java目录

Flume 启动遇到的坑说明:

如果启动一个agent 包如下错误,就需要指定 -c 指向配置文件的路径,否则找不到 logger。因为 log4j.properties 是在 conf 目录。找不到logger 就无法向控制台输出信息

采集方案

一、 source类型是netcat

安装: yum install telnet-server yum install telnet.*

查询xinetd的状态: [root@localhost ~]# service xinetd status xinetd (pid 2967) 正在运行...

启动 service xinetd start

连接: telnet ip 端口

退出telnet ctrl+] 然后在telnet 命令行输入 quit 就可以退出了

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux1
a1.sources.r1.port = 666
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
命令./flume-ng agent -n a1 -f ../conf/netcat.conf -Dflume.root.logger=INFO,console

./flume-ng agent -n a1 -c ../conf  -f ../conf/netcat.conf -Dflume.root.logger=INFO,console

二、 source类型是spooldir

spooldir 其实就是采集一个文件夹类型

a1.sources = r1  ##给agent的source起名
a1.sinks = k1   ##给agent的sinks起名
a1.channels = c1 ##给agent的channels起名
a1.sources.r1.type = spooldir  ##文件夹
a1.sources.r1.spoolDir = /root/flume  ##要采集的目录
a1.sources.r1.fileHeader = true  ##采集过的文件是否需要添加一个后缀
a1.sinks.k1.type = logger
a1.channels.c1.type = memory  ##把缓存数据放到内存
a1.channels.c1.capacity = 1000  ##管道里面最多可以存放多少事件
a1.channels.c1.transactionCapacity = 100  ##每次对最接收多少事件
a1.sources.r1.channels = c1  ## 把source和channels连接上
a1.sinks.k1.channel = c1  ##把sinks和channel连接上
命令:…/bin/flume-ng agent -n a1 -f ../conf/spooldir.conf -Dflume.root.logger=INFO,console

三、 source 类型是avro

这个配置source是avro类型,是一个服务器,这个服务器

开启一个端口8088,目是接收数据的

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro  
a1.sources.r1.bind = linux1 ##当前这一台机器的ip 
a1.sources.r1.port = 8088
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
命令:…/bin/flume-ng agent -n a1 -f ../conf/server.conf -Dflume.root.logger=INFO,console

四、 source是socket类型

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type =syslogtcp
a1.sources.r1.bind=linux1
a1.sources.r1.port=8080

a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

五、 sink类型是avro

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux2  ##当前这一台机器的ip
a1.sources.r1.port = 666

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux1
a1.sinks.k1.port = 8088
a1.sinks.k1.batch-size = 2

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

./bin/flume-ng agent -n a1 -f ./conf/client.conf -Dflume.root.logger=INFO,console

六、 sink类型是两个avro

a1.sources = r1 r2
a1.sinks = k1 k2
a1.channels = c1 c2

a1.sources.r1.type = netcat
a1.sources.r1.bind = linux2
a1.sources.r1.port = 666

a1.sources.r2.type = netcat
a1.sources.r2.bind = linux2
a1.sources.r2.port = 777

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux1
a1.sinks.k1.port = 8088
a1.sinks.k1.batch-size = 2

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux1
a1.sinks.k2.port = 8088
a1.sinks.k2.batch-size = 2

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel = c2

七、 sink 类型是hdfs

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux1
a1.sources.r1.port = 666


a1.sinks.k1.type = hdfs #sink到hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
##filePrefix 默认值:FlumeData
##写入hdfs的文件名前缀,可以使用flume提供的日期及%{host}表达式。
a1.sinks.k1.hdfs.filePrefix = events-
##默认值:30
##hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒
##如果设置成0,则表示不根据时间来滚动文件
#注:滚动(roll)指的是,hdfs sink将临时文件重命名成最终目标文件,
#并新打开一个临时文件来写入数据;
a1.sinks.k1.hdfs.rollInterval = 30
##默认值:1024
##当临时文件达到该大小(单位:bytes)时,滚动成目标文件;
##如果设置成0,则表示不根据临时文件大小来滚动文件;
a1.sinks.k1.hdfs.rollSize = 0
##默认值:10
##当events数据达到该数量时候,将临时文件滚动成目标文件;
##如果设置成0,则表示不根据events数据来滚动文件;
a1.sinks.k1.hdfs.rollCount = 0
##batchSize 默认值:100
##每个批次刷新到HDFS上的events数量;
a1.sinks.k1.hdfs.batchSize = 1
##useLocalTimeStamp
##默认值:flase
##是否使用当地时间。
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

八、 sink类型是kafka类型的

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = linux1
a1.sources.r1.port = 666

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = Hellokafka
a1.sinks.k1.brokerList = linux1:9092,linux2:9092,linux3:9092  
a1.sinks.k1.requiredAcks = 1  
a1.sinks.k1.batchSize = 20

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

九、 Source 是mysql

注意事项:

必须把flume-ng-sql-source-1.4.1.jar 和mysql-connector-java-5.1.6-bin.jar 复制到lib目录

a1.sources = r1
a1.sinks = k1
a1.channels = c1

###########sources#################
# r1
a1.sources.r1.type = org.keedio.flume.source.SQLSource
a1.sources.r1.hibernate.connection.url = jdbc:mysql://localhost:3306/test
a1.sources.r1.hibernate.connection.user = root
a1.sources.r1.hibernate.connection.password = 123456
a1.sources.r1.hibernate.connection.autocommit = true
a1.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.r1.run.query.delay=10000
a1.sources.r1.status.file.path = /root/data/flume/
a1.sources.r1.status.file.name = sqlSource.status
a1.sources.r1.start.from = 0
a1.sources.r1.custom.query = select id,userName from user where id > $@$ order by id asc
a1.sources.r1.batch.size = 1000
a1.sources.r1.max.rows = 1000
a1.sources.r1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.r1.hibernate.c3p0.min_size=1
a1.sources.r1.hibernate.c3p0.max_size=10

a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

十、 Source是TAILDIR

这一种source可以支持断点续传 支持正则表达式 (新版本才支持)

CDN_FLOW_2020020120-.TXT

# Describe/configure the source


a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /root/flume/.*CDN.*
a1.sources.r1.positionFile= /root/a.json

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = tname
a1.sinks.k1.kafka.bootstrap.servers = linux1:9092,linux2:9092,linux3:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

十一、 Sink 是sparkstreaming

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = master ##当前这一台机器的ip
a1.sources.r1.port = 666

# 描述和配置 sinks 组件:k1 
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
# 负责汇总数据的服务器的IP地址或主机名 也就是flume安装的节点主机名
a1.sinks.k1.hostname=master 
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000 # 批处理大小

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

./bin/flume-ng agent -n a1 -f ./conf/client.conf -Dflume.root.logger=INFO,console

注意事项:

  1. Flume 1.8 以前的版本需要把FLUME_HOME/lib/fscala-library-2.0.5.jar替换为spark安装包里的 scala-library-2.11.8.jar,Flume 1.9以上无需替换。

  2. 把如下依赖的jar 包下载下来导入到flume的lib目录。

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-flume-sink_2.11</artifactId>
        <version>2.2.3</version>
    </dependency>
    

SparkStreaming 读取 Flume 案例:

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>day1_scala</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.3</version>
        </dependency>
        <!--
        这个jar包需要复制到flume的lib里,本 Scala 程序不适用
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume-sink_2.11</artifactId>
            <version>2.2.3</version>
        </dependency>
       -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.11</artifactId>
            <version>2.2.3</version>
        </dependency>
    </dependencies>
</project>

scala 代码:

package a
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
object FlumeEventCount {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Milliseconds(3000))
    val stream = FlumeUtils.createPollingStream(ssc,"master",8888)
    stream.map(x => new String(x.event.getBody().array())).print()
    ssc.start()
    ssc.awaitTermination()
  }
}
转载自:https://juejin.cn/post/7386504498477187099
评论
请登录