php7.0的mongodb擴展是完全不兼容以前老的php5.6的mongo擴展的,在這里對mongodb的擴展做一個簡單的總結。
廢話不多說直接寫代碼
# 創建mongodb鏈接
public function connect($conf_arr) {
try{
$conn_str = "mongodb://" . $conf_arr['host'] . ":" . $conf_arr['port'] . "/" . $conf_arr['db_name'];
$options = array(
'username' => $conf_arr['username'],
'password' => $conf_arr['password'],
'readPreference' => $conf_arr['read_preference'],
'connectTimeoutMS' => intval($conf_arr['connect_timeout_ms']),
'socketTimeoutMS' => intval($conf_arr['socket_timeout_ms']),
);
return new MongoDBDriverManager($conn_str, $options);
}
catch(Exception $e){
return false;
}
}
# find查詢方法
public function find($query = [], $fields = [], $collection, $sort = [], $limit = 0, $skip = 0) {
$conn = $this->connect();
if (empty($conn)) return false;
try {
$data = [];
$options = [];
if (!empty($query)) {
$options['projection'] = array_fill_keys($fields, 1);
}
if (!empty($sort)) $options['sort'] = $sort;
if (!empty($limit)) $options['limit'] = $limit;
if (!empty($skip)) $options['skip'] = $skip;
$mongoQuery = new MongoDBDriverQuery($query, $options);
$readPreference = new MongoDBDriverReadPreference(MongoDBDriverReadPreference::RP_SECONDARY);
$cursor = $conn->executeQuery($collection, $mongoQuery, $readPreference);
foreach($cursor as $value) {
$data[] = (array)$value;
}
return $data;
} catch (Exception $e) {
// write log
}
return false;
}
# insert 插入方法
public function insert($add_arr, $collection) {
if (empty($add_arr) || !is_array($add_arr) || '' == $collection) {
return false;
}
$conn = $this->connect();
if (empty($conn)) {
return false;
}
try {
$bulk = new MongoDBDriverBulkWrite();
$bulk->insert($add_arr);
$writeConcern = new MongoDBDriverWriteConcern(MongoDBDriverWriteConcern::MAJORITY, 6000);
$result = $conn->executeBulkWrite($collection, $bulk, $writeConcern);
if ($result->getInsertedCount()) {
return true;
}
} catch (Exception $e) {
// write log pass
}
return false;
}
# delete 刪除方法
public function delete($where_arr, $options = [], $collection) {
if (empty($where_arr) || '' == $collection) {
return false;
}
if (!isset($options['justOne'])) {
$options['justOne'] = false;
}
$conn = $this->connect();
if (empty($conn)) {
return false;
}
try {
$bulk = new MongoDBDriverBulkWrite();
$bulk->delete($where_arr, $options);
$writeConcern = new MongoDBDriverWriteConcern(MongoDBDriverWriteConcern::MAJORITY, 30000);
$result = $conn->executeBulkWrite($collection, $bulk, $writeConcern);
return true;
} catch (Exception $e) {
// write log pass
}
return false;
}
# 執行cmd命令操作
public function command($params, $dbName) {
$conn = $this->connect();
if (empty($conn)) {
return false;
}
try {
$cmd = new MongoDBDriverCommand($params);
$result = $conn->executeCommand($dbName, $cmd);
return $result;
} catch (Exception $e) {
// write log pass
}
return false;
}
# distinct方法
public function distinct($key, $where, $collection) {
try {
$cmd = [
'distinct' => $collection,
'key' => $key,
'query' => $where,
];
$res = $this->command($cmd);
$result = $res->toArray();
return $result[0]->values;
} catch (Exception $e) {
// write log pass
}
return false;
}
# count方法
public function count($query, $collection) {
try {
$cmd = [
'count' => $collection,
'query' => $query,
];
$res = $this->command($cmd);
$result = $res->toArray();
return $result[0]->n;
} catch (Exception $e) {
// write log pass
}
return false;
}
# aggregate 方法
public function aggregate($where, $group, $collection) {
try {
$cmd = [
'aggregate' => $collection,
'pipeline' => [
['$match' => $where],
['$group' => $group],
],
'explain' => false,
];
$res = $this->command($cmd);
if (!$res) {
return false;
}
$result = $res->toArray();
return $result[0]->total;
} catch (Exception $e) {
// write log
}
return false;
}






