課程介紹與筆記
1.HDFS shell
1.0查看幫助
hadoop fs -help
1.1上傳
hadoop fs -put <linux上文件><hdfs上的路徑>
1.2查看文件內(nèi)容
hadoop fs -cat <hdfs上的路徑>
1.3查看文件列表
hadoop fs -ls /
1.4下載文件
hadoop fs -get <hdfs上的路徑><linux上文件>
2.使用java接口操作HDFS
見eclipse工程下的demo
3.hadoop通信機制
不同進(jìn)程之間的方法進(jìn)行調(diào)用
4.HDFS源碼分析
FileSystem.get —> 通過反射實例化了一個DistributedFileSystem —> new DFSCilent()把他作為自己的成員變量
在DFSClient構(gòu)造方法里面,調(diào)用了createNamenode,使用了RPC機制,得到了一個NameNode的代理對象,就可以和NameNode進(jìn)行通信了
FileSystem —> DistributedFileSystem —> DFSClient —> NameNode的代理
1.執(zhí)行MR的命令:
hadoop jar <jar在linux的路徑><main方法所在的類的全類名><參數(shù)>
例子:
hadoop jar /root/wc1.jar cn.itcast.d3.hadoop.mr.WordCount hdfs://itcast:9000/words /out2
2.MR執(zhí)行流程
(1).客戶端提交一個mr的jar包給JobClient(提交方式:hadoop jar …)
(2).JobClient通過RPC和JobTracker進(jìn)行通信,返回一個存放jar包的地址(HDFS)和jobId
(3).client將jar包寫入到HDFS當(dāng)中(path = hdfs上的地址 + jobId)
(4).開始提交任務(wù)(任務(wù)的描述信息,不是jar, 包括jobid,jar存放的位置,配置信息等等)
(5).JobTracker進(jìn)行初始化任務(wù)
(6).讀取HDFS上的要處理的文件,開始計算輸入分片,每一個分片對應(yīng)一個MapperTask
(7).TaskTracker通過心跳機制領(lǐng)取任務(wù)(任務(wù)的描述信息)
(8).下載所需的jar,配置文件等
(9).TaskTracker啟動一個java child子進(jìn)程,用來執(zhí)行具體的任務(wù)(MapperTask或ReducerTask)
(10).將結(jié)果寫入到HDFS當(dāng)中
1.實現(xiàn)分區(qū)的步驟:
1.1先分析一下具體的業(yè)務(wù)邏輯,確定大概有多少個分區(qū)
1.2首先書寫一個類,它要繼承org.apache.hadoop.mapreduce.Partitioner這個類
1.3重寫public int getPartition這個方法,根據(jù)具體邏輯,讀數(shù)據(jù)庫或者配置返回相同的數(shù)字
1.4在main方法中設(shè)置Partioner的類,job.setPartitionerClass(DataPartitioner.class);
1.5設(shè)置Reducer的數(shù)量,job.setNumReduceTasks(6);
2.排序MR默認(rèn)是按key2進(jìn)行排序的,如果想自定義排序規(guī)則,被排序的對象要實現(xiàn)WritableComparable接口,在compareTo方法中實現(xiàn)排序規(guī)則,然后將這個對象當(dāng)做k2,即可完成排序
3.combiner的作用就是在map端對輸出先做一次合并,以減少傳輸?shù)絩educer的數(shù)據(jù)量。
4.MR啟動流程
start-mapred.sh —> hadoop-daemon.sh —> hadoop —> org.apache.hadoop.mapred.JobTracker
Jobtracker調(diào)用順序:main —> startTracker —> new JobTracker 在其構(gòu)造方法中首先創(chuàng)建一個調(diào)度器,接著創(chuàng)建一個RPC的server(interTrackerServer)tasktracker會通過PRC機制與其通信
然后調(diào)用offerService方法對外提供服務(wù),在offerService方法中啟動RPC server,初始化jobtracker,調(diào)用taskScheduler的start方法 —> eagerTaskInitializationListener調(diào)用start方法,
—> 調(diào)用jobInitManagerThread的start方法,因為其是一個線程,會調(diào)用JobInitManager的run方法 —> jobInitQueue任務(wù)隊列去取第一個任務(wù),然后把它丟入線程池中,然后調(diào)用—>InitJob的run方法
—> jobTracker的initJob方法 —> JobInProgress的initTasks —> maps = new TaskInProgress[numMapTasks]和reduces = new TaskInProgress[numReduceTasks];
TaskTracker調(diào)用順序:main —> new TaskTracker在其構(gòu)造方法中調(diào)用了initialize方法,在initialize方法中調(diào)用RPC.waitForProxy得到一個jobtracker的代理對象
接著TaskTracker調(diào)用了本身的run方法,—> offerService方法 —> transmitHeartBeat返回值是(HeartbeatResponse)是jobTracker的指令,在transmitHeartBeat方法中InterTrackerProtocol調(diào)用了heartbeat將tasktracker的狀態(tài)通過RPC機制發(fā)送給jobTracker,返回值就是JobTracker的指令
heartbeatResponse.getActions()得到具體的指令,然后判斷指令的具體類型,開始執(zhí)行任務(wù)
addToTaskQueue啟動類型的指令加入到隊列當(dāng)中,TaskLauncher又把任務(wù)加入到任務(wù)隊列當(dāng)中,—> TaskLauncher的run方法 —> startNewTask方法 —> localizeJob下載資源 —> launchTaskForJob開始加載任務(wù) —> launchTask —> runner.start()啟動線程; —> TaskRunner調(diào)用run方法 —> launchJvmAndWait啟動java child進(jìn)程
1.上傳zk安裝包
2.解壓
3.配置(先在一臺節(jié)點上配置)
3.1添加一個zoo.cfg配置文件
$ZOOKEEPER/conf
mv zoo_sample.cfg zoo.cfg
3.2修改配置文件(zoo.cfg)
dataDir=/itcast/zookeeper-3.4.5/data
server.1=itcast05:2888:3888
server.2=itcast06:2888:3888
server.3=itcast07:2888:3888
3.3在(dataDir=/itcast/zookeeper-3.4.5/data)創(chuàng)建一個myid文件,里面內(nèi)容是server.N中的N(server.2里面內(nèi)容為2)
echo “1” > myid
3.4將配置好的zk拷貝到其他節(jié)點
scp -r /itcast/zookeeper-3.4.5/ itcast06:/itcast/
scp -r /itcast/zookeeper-3.4.5/ itcast07:/itcast/
3.5注意:在其他節(jié)點上一定要修改myid的內(nèi)容
在itcast06應(yīng)該講myid的內(nèi)容改為2 (echo “6” > myid)
在itcast07應(yīng)該講myid的內(nèi)容改為3 (echo “7” > myid)
4.啟動集群
分別啟動zk
./zkServer.sh start