zookeeper簡介
zookeeper是為分布式應用提供分布式協作服務的開源軟件。它提供了一組簡單的原子操作,分布式應用可以基于這些原子操作來實現更高層次的同步服務,配置維護,組管理和命名。zookeeper的設計使基于它的編程非常容易,若我們熟悉目錄樹結構的文件系統,也會很容易使用zookeeper的數據模型樣式。它運行在JAVA上,有java和c的客戶端。
協作服務因難于獲取正確而臭名遠揚,他們特別易于出錯如競爭條件和死鎖。zookeeper的動機是減輕分布式應用中從零開始實現協作服務的壓力。
zookeeper的特點
1.簡單:zookeeper運行分布式進行通過一個共享的層次命名空間來進行協作,該命名空間的組織類似于標準的文件系統。命名空間包括數據注冊器(稱之為znode),在zookeeper看來,這類似于文件和目錄。與典型的文件系統設計用來存儲不同的是,zookeeper數據是存放在內存中,這意味著zookeeper可以實現很高的吞吐量和低延遲。
ZooKeeper 實現在高性能,高可用性,嚴格有序的訪問方面有很大的優勢。在性能方面的優勢使它可以應用在大型的的分布式系統。在可靠性方面,避免單點故障。嚴格的順序訪問使它在客戶端可以實現復雜的同步原語。
2. 可復制:類似于分布式進程的協作,zookeeper本身很容易在一組主機(稱之為集合)中實現復制。zookeeper服務示意圖:
組成ZooKeeper服務的一組服務器都必須知道對方的。它們保存了內存映像的狀態,以及在持久存儲中的事務日志和快照。只要大部分的服務器可用,ZooKeeper服務將可用。
客戶端連接到一臺ZooKeeper服務器。客戶端維護一個TCP連接,通過它發送請求,得到響應,得到監視事件,并發送心跳。如果TCP連接到服務器中斷,客戶端可以連接到不同的服務器。
3. 有序:ZooKeeper給每次更新使用數字打標記,它反映了所有zookeeper事務的順序。隨后的操作可以使用這些順序來實現更高級別的抽象,如同步原語。
4.快速:它特別快,在“讀為主”的工作中,ZooKeeper 應用程序運行在數千臺機器,它在讀遠比寫更多的時候(在10:1的比例)表現的最好。
數據模型與層次命名空間
ZooKeeper提供的名稱空間更像是一個標準的文件系統。一個名字是一個由一個(或)分隔的路徑元素的序列。zookeeper名稱空間的每個節點由路徑來標示。
節點和臨時節點
不像標準的文件系統,在ZooKeeper 命名空間中每個節點都有與它相關的數據以及子節點。它就像這樣一個文件系統,它允許一個文件也可以是一個目錄。(zookeeper是用來儲存協作數據:狀態信息,配置,位置信息等,因此,存儲在每個節點的數據通常是很小的,在字節到千字節范圍。)我們使用術語znode來表明我們談論的是zookeeper數據節點。
znodes保存一個數據結構,該數據結構包括數據變化的版本號和時間戳,ACL的變化,這些信息允許緩存驗證和協作更新。一個znode的數據的每次變化,版本號的增加。例如,每當客戶檢索數據時,它也接收到數據的版本。
在一個命名空間中的每個節點存儲的數據的讀寫都是原子性的。讀獲取一個Znode所有的數據字節;寫替換所有的數據。每個節點都有一個訪問控制列表(ACL),限制誰可以做什么。
zookeeper也有臨時節點的概念。這些znodes只要創建znode的會話是活躍的,它就存在的。當會話結束時,這些znode被刪除。
條件更新與監控
ZooKeeper支持監控的概念。客戶端可以在一個znode上設置一個監控。當znode發生變化時會觸發或者移除監控。當監控觸發時,客戶端接收到一個報文,表明znode發生了變化。若客戶端和一個zookeeper服務器的連接損壞時,客戶端接收到一個本地通知。
保障
ZooKeeper非常快速和簡單. 雖然它的目標是為建設更為復雜的服務,例如同步,它提供了一系列的保證。這些是:
- 順序一致性----客戶端的更新將被應用于它們被發送的命令中。
- 原子性-- - 更新要么成功要么失敗,不存在部分成功或者部分失敗.
- 單系統映像 ---- 不管連接到哪臺服務器,客戶端看到相同的服務視圖.
- 可靠性---- 一旦一個更新發生,直到下次一個客戶端重新了更新,否則從更新的時間后都會保持。
- 及時性--- - 在一定時間范圍內保證系統的客戶視圖是最新的.
簡單api
zookeeper設計目標之一是提供一個簡單的編程接口,因此,它只支持下面這些操作:
create
在節點樹上某個位置上創建一個新的節點。
delete
刪除一個節點
exists
測試某位置的節點是否存在
get data
從一個節點讀取數據
set data
向一個節點寫入數據
get children
檢索一個節點的一組子節點
sync
等待數據傳播至一致。
實現
zookeeper組件顯示了zookeeper服務的高級組件。除了request processor,組成zookeeper服務的每個服務器復制它的每個組件的copy。
zookeeper組件
replicated database是一個包含整個數據數的內存數據庫. 為了可復原,更新被寫到磁盤上,寫操作在應用到內存數據庫之前,先序列化到磁盤。
每個zookeeper服務器給所有的客戶端提供服務。客戶端恰恰連接到一個服務器來提交請求。讀請求由每個服務器數據庫的本地復制提供服務。寫請求改變了服務的狀態,由request processor來處理。
作為通信協議的一部分,所有客戶端的寫請求由一個單獨的服務器處理,這個服務器是zookeeper的leader服務器,其余的zookeeper服務器叫做follower,follower從leader接收消息并達成消息傳輸。消息層在失敗后替換leader并同步到連接到leader所有的follower。
ZooKeeper使用自定義的原子消息協議. 因消息層是原子性的, ZooKeeper 可以保證本地復制不會沖突. 當leader接收到一個寫請求,當寫操作應用到系統時,leader計算出系統的狀態,并轉化成一個捕捉新狀態的事務.
zookeeper啟動
服務端啟動
bin/zkServer.sh start
其中,啟動命令如下:
start)
echo -n "Starting zookeeper ... "
if [ -f "$ZOOPIDFILE" ]; then
if kill -0 `cat "$ZOOPIDFILE"` > /dev/null 2>&1; then
echo $command already running as process `cat "$ZOOPIDFILE"`.
exit 0
fi
fi
nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR}"
"-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}"
-XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p'
-cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &
if [ $? -eq 0 ]
then
if /bin/echo -n $! > "$ZOOPIDFILE"
then
sleep 1
pid=$(cat "${ZOOPIDFILE}")
if ps -p "${pid}" > /dev/null 2>&1; then
echo STARTED
else
echo FAILED TO START
exit 1
fi
else
echo FAILED TO WRITE PID
exit 1
fi
else
echo SERVER DID NOT START
exit 1
fi
;;
其中:
ZOOMAIN 是啟動程序的入口,其類為:
org.Apache.zookeeper.server.quorum.QuorumPeerMain
它的啟動方法為:
/**
* To start the replicated server specify the configuration file name on
* the command line.
* @param args path to the configfile
*/
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
LOG.error("Invalid arguments, exiting abnormally", e);
LOG.info(USAGE);
System.err.println(USAGE);
System.exit(2);
} catch (ConfigException e) {
LOG.error("Invalid config, exiting abnormally", e);
System.err.println("Invalid config, exiting abnormally");
System.exit(2);
} catch (DatadirException e) {
LOG.error("Unable to access datadir, exiting abnormally", e);
System.err.println("Unable to access datadir, exiting abnormally");
System.exit(3);
} catch (AdminServerException e) {
LOG.error("Unable to start AdminServer, exiting abnormally", e);
System.err.println("Unable to start AdminServer, exiting abnormally");
System.exit(4);
} catch (Exception e) {
LOG.error("Unexpected exception, exiting abnormally", e);
System.exit(1);
}
LOG.info("Exiting normally");
System.exit(0);
}
調用初始化方法及run方法:
protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException
{
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}
上述代碼主要分3部分:
1. 解析配置文件,默認的配置文件為上一級目錄
config/zookeeper.properties或者config/zookeeper.cfg
/**
* Parse a ZooKeeper configuration file
* @param path the patch of the configuration file
* @throws ConfigException error processing configuration
*/
public void parse(String path) throws ConfigException {
LOG.info("Reading configuration from: " + path);
try {
File configFile = (new VerifyingFileFactory.Builder(LOG)
.warnForRelativePath()
.failForNonExistingPath()
.build()).create(path);
Properties cfg = new Properties();
FileInputStream in = new FileInputStream(configFile);
try {
cfg.load(in);
configFileStr = path;
} finally {
in.close();
}
parseProperties(cfg);
} catch (IOException e) {
throw new ConfigException("Error processing " + path, e);
} catch (IllegalArgumentException e) {
throw new ConfigException("Error processing " + path, e);
}
if (dynamicConfigFileStr!=null) {
try {
Properties dynamicCfg = new Properties();
FileInputStream inConfig = new FileInputStream(dynamicConfigFileStr);
try {
dynamicCfg.load(inConfig);
if (dynamicCfg.getProperty("version") != null) {
throw new ConfigException("dynamic file shouldn't have version inside");
}
String version = getVersionFromFilename(dynamicConfigFileStr);
// If there isn't any version associated with the filename,
// the default version is 0.
if (version != null) {
dynamicCfg.setProperty("version", version);
}
} finally {
inConfig.close();
}
setupQuorumPeerConfig(dynamicCfg, false);
} catch (IOException e) {
throw new ConfigException("Error processing " + dynamicConfigFileStr, e);
} catch (IllegalArgumentException e) {
throw new ConfigException("Error processing " + dynamicConfigFileStr, e);
}
File nextDynamicConfigFile = new File(configFileStr + nextDynamicConfigFileSuffix);
if (nextDynamicConfigFile.exists()) {
try {
Properties dynamicConfigNextCfg = new Properties();
FileInputStream inConfigNext = new FileInputStream(nextDynamicConfigFile);
try {
dynamicConfigNextCfg.load(inConfigNext);
} finally {
inConfigNext.close();
}
boolean isHierarchical = false;
for (Entry<Object, Object> entry : dynamicConfigNextCfg.entrySet()) {
String key = entry.getKey().toString().trim();
if (key.startsWith("group") || key.startsWith("weight")) {
isHierarchical = true;
break;
}
}
lastSeenQuorumVerifier = createQuorumVerifier(dynamicConfigNextCfg, isHierarchical);
} catch (IOException e) {
LOG.warn("NextQuorumVerifier is initiated to null");
}
}
}
}
2. 啟動安排清除任務
// Start and schedule the the purge task DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start();
調用start方法:
/**
* Validates the purge configuration and schedules the purge task. Purge
* task keeps the most recent <code>snapRetainCount</code> number of
* snapshots and deletes the remaining for every <code>purgeInterval</code>
* hour(s).
* <p>
* <code>purgeInterval</code> of <code>0</code> or
* <code>negative integer</code> will not schedule the purge task.
* </p>
*
* @see PurgeTxnLog#purge(File, File, int)
*/
public void start() {
if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
LOG.warn("Purge task is already running.");
return;
}
// Don't schedule the purge task with zero or negative purge interval.
if (purgeInterval <= 0) {
LOG.info("Purge task is not scheduled.");
return;
}
timer = new Timer("PurgeTask", true);
TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
purgeTaskStatus = PurgeTaskStatus.STARTED;
}
從上面代碼可以看到,清除工作啟動了一個定時器timer,PurgeTask繼承實現了TimeTask(一個可以被定時器安排執行一次或者多次的task),PurgeTask的實現如下:
static class PurgeTask extends TimerTask {
private File logsDir;
private File snapsDir;
private int snapRetainCount;
public PurgeTask(File dataDir, File snapDir, int count) {
logsDir = dataDir;
snapsDir = snapDir;
snapRetainCount = count;
}
@Override
public void run() {
LOG.info("Purge task started.");
try {
PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
} catch (Exception e) {
LOG.error("Error occured while purging.", e);
}
LOG.info("Purge task completed.");
}
}
調用purge方法:
/**
* Purges the snapshot and logs keeping the last num snapshots and the
* corresponding logs. If logs are rolling or a new snapshot is created
* during this process, these newest N snapshots or any data logs will be
* excluded from current purging cycle.
*
* @param dataDir the dir that has the logs
* @param snapDir the dir that has the snapshots
* @param num the number of snapshots to keep
* @throws IOException
*/
public static void purge(File dataDir, File snapDir, int num) throws IOException {
if (num < 3) {
throw new IllegalArgumentException(COUNT_ERR_MSG);
}
FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
List<File> snaps = txnLog.findNRecentSnapshots(num);
retainNRecentSnapshots(txnLog, snaps);
}
先獲取日志文件和快照,然后調用retainNRecentSnapshots方法處理:
static void retainNRecentSnapshots(FileTxnSnapLog txnLog, List<File> snaps) {
// found any valid recent snapshots?
if (snaps.size() == 0)
return;
File snapShot = snaps.get(snaps.size() -1);
final long leastZxidToBeRetain = Util.getZxidFromName(
snapShot.getName(), PREFIX_SNAPSHOT);
class MyFileFilter implements FileFilter{
private final String prefix;
MyFileFilter(String prefix){
this.prefix=prefix;
}
public boolean accept(File f){
if(!f.getName().startsWith(prefix + "."))
return false;
long fZxid = Util.getZxidFromName(f.getName(), prefix);
if (fZxid >= leastZxidToBeRetain) {
return false;
}
return true;
}
}
// add all non-excluded log files
List<File> files = new ArrayList<File>(Arrays.asList(txnLog
.getDataDir().listFiles(new MyFileFilter(PREFIX_LOG))));
// add all non-excluded snapshot files to the deletion list
files.addAll(Arrays.asList(txnLog.getSnapDir().listFiles(
new MyFileFilter(PREFIX_SNAPSHOT))));
// remove the old files
for(File f: files)
{
System.out.println("Removing file: "+
DateFormat.getDateTimeInstance().format(f.lastModified())+
"t"+f.getPath());
if(!f.delete()){
System.err.println("Failed to remove "+f.getPath());
}
}
}
3. 啟動zookeeper 服務器
3.1 啟動單機
/*
* Start up the ZooKeeper server.
*
* @param args the configfile or the port datadir [ticktime]
*/
public static void main(String[] args) {
ZooKeeperServerMain main = new ZooKeeperServerMain();
try {
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
LOG.error("Invalid arguments, exiting abnormally", e);
LOG.info(USAGE);
System.err.println(USAGE);
System.exit(2);
} catch (ConfigException e) {
LOG.error("Invalid config, exiting abnormally", e);
System.err.println("Invalid config, exiting abnormally");
System.exit(2);
} catch (DatadirException e) {
LOG.error("Unable to access datadir, exiting abnormally", e);
System.err.println("Unable to access datadir, exiting abnormally");
System.exit(3);
} catch (AdminServerException e) {
LOG.error("Unable to start AdminServer, exiting abnormally", e);
System.err.println("Unable to start AdminServer, exiting abnormally");
System.exit(4);
} catch (Exception e) {
LOG.error("Unexpected exception, exiting abnormally", e);
System.exit(1);
}
LOG.info("Exiting normally");
System.exit(0);
}
調用方法:
protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException
{
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
ServerConfig config = new ServerConfig();
if (args.length == 1) {
config.parse(args[0]);
} else {
config.parse(args);
}
runFromConfig(config);
}
啟動過程:
/**
* Run from a ServerConfig.
* @param config ServerConfig to use.
* @throws IOException
* @throws AdminServerException
*/
public void runFromConfig(ServerConfig config) throws IOException, AdminServerException {
LOG.info("Starting server");
FileTxnSnapLog txnLog = null;
try {
// Note that this thread isn't going to be doing anything else,
// so rather than spawning another thread, we will just call
// run() in this thread.
// create a file logger url from the command line args
txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
ZooKeeperServer zkServer = new ZooKeeperServer( txnLog,
config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null);
// Start Admin server
adminServer = AdminServerFactory.createAdminServer();
adminServer.setZooKeeperServer(zkServer);
adminServer.start();
boolean needStartZKServer = true;
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);
cnxnFactory.startup(zkServer);
// zkServer has been started. So we don't need to start it again in secureCnxnFactory.
needStartZKServer = false;
}
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true);
secureCnxnFactory.startup(zkServer, needStartZKServer);
}
containerManager = new ContainerManager(zkServer.getZKDatabase(), zkServer.firstProcessor,
Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)),
Integer.getInteger("znode.container.maxPerMinute", 10000)
);
containerManager.start();
if (cnxnFactory != null) {
cnxnFactory.join();
}
if (secureCnxnFactory != null) {
secureCnxnFactory.join();
}
if (zkServer.isRunning()) {
zkServer.shutdown();
}
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Server interrupted", e);
} finally {
if (txnLog != null) {
txnLog.close();
}
}
}
cnxnFactory.startup(zkServer);[NettyServerCnxnFactory]
@Override
public void startup(ZooKeeperServer zks, boolean startServer)
throws IOException, InterruptedException {
start();
setZooKeeperServer(zks);
if (startServer) {
zks.startdata();
zks.startup();
}
}
public synchronized void startup() {
if (sessionTracker == null) {
createSessionTracker();
}
startSessionTracker();
setupRequestProcessors();
registerJMX();
state = State.RUNNING;
notifyAll();
}
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
3.2 集群啟動
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(),
false);
}
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(),
config.getMaxClientCnxns(),
true);
}
quorumPeer = new QuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(
config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier()!=null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
從上述代碼可以看出,QuorumPeer的start()方法和join()方法是主流程。
QuorumPeer繼承了ZooKeeperThread,ZooKeeperThread繼承自Thread,故QuorumPeer間接繼承了Thread。
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
loadDataBase();
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
startLeaderElection();
super.start();
}
3.2.1. 啟動時先從內存數據庫中恢復數據
private void loadDataBase() {
try {
zkDb.loadDataBase();
// load the epochs
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
try {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only hAppen once when moving to a
// new code version
currentEpoch = epochOfZxid;
LOG.info(CURRENT_EPOCH_FILENAME
+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
currentEpoch);
writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
}
if (epochOfZxid > currentEpoch) {
throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
}
try {
acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
acceptedEpoch = epochOfZxid;
LOG.info(ACCEPTED_EPOCH_FILENAME
+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
acceptedEpoch);
writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
}
if (acceptedEpoch < currentEpoch) {
throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch));
}
} catch(IOException ie) {
LOG.error("Unable to load database on disk", ie);
throw new RuntimeException("Unable to run quorum server ", ie);
}
}
調用
/**
* load the database from the disk onto memory and also add
* the transactions to the committedlog in memory.
* @return the last valid zxid on disk
* @throws IOException
*/
public long loadDataBase() throws IOException {
PlayBackListener listener=new PlayBackListener(){
public void onTxnLoaded(TxnHeader hdr,Record txn){
Request r = new Request(0, hdr.getCxid(),hdr.getType(), hdr, txn, hdr.getZxid());
addCommittedProposal(r);
}
};
long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
initialized = true;
return zxid;
}
/**
* maintains a list of last <i>committedLog</i>
* or so committed requests. This is used for
* fast follower synchronization.
* @param request committed request
*/
public void addCommittedProposal(Request request) {
WriteLock wl = logLock.writeLock();
try {
wl.lock();
if (committedLog.size() > commitLogCount) {
committedLog.removeFirst();
minCommittedLog = committedLog.getFirst().packet.getZxid();
}
if (committedLog.isEmpty()) {
minCommittedLog = request.zxid;
maxCommittedLog = request.zxid;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
try {
request.getHdr().serialize(boa, "hdr");
if (request.getTxn() != null) {
request.getTxn().serialize(boa, "txn");
}
baos.close();
} catch (IOException e) {
LOG.error("This really should be impossible", e);
}
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
baos.toByteArray(), null);
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
committedLog.add(p);
maxCommittedLog = p.packet.getZxid();
} finally {
wl.unlock();
}
}
3.2.2 啟動NettyServerCnxnFactory綁定服務
@Override
public void start() {
LOG.info("binding to port " + localAddress);
parentChannel = bootstrap.bind(localAddress);
}
3.2.3 選舉算法
synchronized public void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
// if (!getView().containsKey(myid)) {
// throw new RuntimeException("My id " + myid + " not in the peer list");
//}
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(myQuorumAddr.getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
this.electionAlg = createElectionAlgorithm(electionType);
}
調用
@SuppressWarnings("deprecation")
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
qcm = new QuorumCnxManager(this);
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
調用選舉方法:
/**
* Starts a new round of leader election. Whenever our QuorumPeer
* changes its state to LOOKING, this method is invoked, and it
* sends notifications to all other peers.
*/
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = Time.currentElapsedTime();
}
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
} else {
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
/*
* Only proceed if the vote comes from a replica in the current or next
* voting view.
*/
switch (n.state) {
case LOOKING:
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
if(n.electionEpoch == logicalclock.get()){
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify that
* a majority are following the same leader.
* Only peer epoch is used to check that the votes come
* from the same ensemble. This is because there is at
* least one corner case in which the ensemble can be
* created with inconsistent zxid and election epoch
* info. However, given that only one ensemble can be
* running at a single point in time and that each
* epoch is used only once, using only the epoch to
* compare the votes is sufficient.
*
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
*/
outofelection.put(n.sid, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, IGNOREVALUE)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecoginized: " + n.state
+ " (n.state), " + n.sid + " (n.sid)");
break;
}
} else {
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
}
}
4. 小結
本文先介紹了zookeeper開源分布式協作系統及其特點、應用場景,然后根據zookeeper的啟動方式,找到zookeeper的入口。在入口方法中,單機啟動使用ZooKeeperServerMain,最終調用ZookeeperServer的startup()方法來RequestProcessor;集群啟動時調用QuorumPeer的start方法,接著也是調用ZookeeperServer的startup()方法來RequestProcessor,最后調用選舉算法選出leader。
參考文獻:
【1】http://zookeeper.apache.org/doc/r3.4.6/zookeeperOver.html
【2】http://zookeeper.apache.org/doc/r3.4.6/zookeeperStarted.html






