開發環境:
- Python/ target=_blank class=infotextkey>Python2.7 + win10
開始先說一下,訪問YouTube需要那啥的,請自行解決,最好是全局代理。
實現代碼:
from bs4 import BeautifulSoup
import lxml
import Queue
import requests
import re,os,sys,random
import threading
import logging
import json,hashlib,urllib
from requests.exceptions import ConnectTimeout,ConnectionError,ReadTimeout,SSLError,MissingSchema,ChunkedEncodingError
import random
'''
遇到不懂的問題?Python學習交流群:821460695滿足你的需求,資料都已經上傳群文件,可以自行下載!
'''
reload(sys)
sys.setdefaultencoding('gbk')
# 日志模塊
logger = logging.getLogger("AppName")
formatter = logging.Formatter('%(asctime)s %(levelname)-5s: %(message)s')
console_handler = logging.StreamHandler(sys.stdout)
console_handler.formatter = formatter
logger.addHandler(console_handler)
logger.setLevel(logging.INFO)
q = Queue.Queue() # url隊列
page_q = Queue.Queue() # 頁面
def downlaod(q,x,path):
urlhash = "https://weibomiaopai.com/"
try:
html = requests.get(urlhash).text
except SSLError:
logger.info(u"網絡不穩定 正在重試")
html = requests.get(urlhash).text
reg = re.compile(r'var hash="(.*?)"', re.S)
result = reg.findall(html)
hash_v = result[0]
while True:
data = q.get()
url, name = data[0], data[1].strip().replace("|", "")
file = os.path.join(path, '%s' + ".mp4") % name
api = "https://steakovercooked.com/api/video/?cached&hash=" + hash_v + "&video=" + url
api2 = "https://helloacm.com/api/video/?cached&hash=" + hash_v + "&video=" + url
try:
res = requests.get(api)
result = json.loads(res.text)
except (ValueError,SSLError):
try:
res = requests.get(api2)
result = json.loads(res.text)
except (ValueError,SSLError):
q.task_done()
return False
vurl = result['url']
logger.info(u"正在下載:%s" %name)
try:
r = requests.get(vurl)
except SSLError:
r = requests.get(vurl)
except MissingSchema:
q.task_done()
continue
try:
with open(file,'wb') as f:
f.write(r.content)
except IOError:
name = u'好開心么么噠 %s' % random.randint(1,9999)
file = os.path.join(path, '%s' + ".mp4") % name
with open(file,'wb') as f:
f.write(r.content)
logger.info(u"下載完成:%s" %name)
q.task_done()
def get_page(keyword,page_q):
while True:
headers = {
'user-agent': 'Mozilla/5.0 (windows NT 10.0; Win64; x64; rv:57.0) Gecko/20100101 Firefox/57.0'
}
page = page_q.get()
url = "https://www.youtube.com/results?sp=EgIIAg%253D%253D&search_query=" + keyword + "&page=" + str(page)
try:
html = requests.get(url, headers=headers).text
except (ConnectTimeout,ConnectionError):
print u"不能訪問youtube 檢查是否已FQ"
os._exit(0)
reg = re.compile(r'"url":"/watch?v=(.*?)","webPageType"', re.S)
result = reg.findall(html)
logger.info(u"第 %s 頁" % page)
for x in result:
vurl = "https://www.youtube.com/watch?v=" + x
try:
res = requests.get(vurl).text
except (ConnectionError,ChunkedEncodingError):
logger.info(u"網絡不穩定 正在重試")
try:
res = requests.get(vurl).text
except SSLError:
continue
reg2 = re.compile(r"<title>(.*?)YouTube",re.S)
name = reg2.findall(res)[0].replace("-","")
if u'\u4e00' <= keyword <= u'\u9fff':
q.put([vurl, name])
else:
# 調用金山詞霸
logger.info(u"正在翻譯")
url_js = "http://www.iciba.com/" + name
html2 = requests.get(url_js).text
soup = BeautifulSoup(html2, "lxml")
try:
res2 = soup.select('.clearfix')[0].get_text()
title = res2.split("n")[2]
except IndexError:
title = u'好開心么么噠 %s' % random.randint(1, 9999)
q.put([vurl, title])
page_q.task_done()
def main():
# 使用幫助
keyword = raw_input(u"請輸入關鍵字:").decode("gbk")
threads = int(raw_input(u"請輸入線程數量(建議1-10): "))
# 判斷目錄
path = 'D:youtube%s' % keyword
if os.path.exists(path) == False:
os.makedirs(path)
# 解析網頁
logger.info(u"開始解析網頁")
for page in range(1,26):
page_q.put(page)
for y in range(threads):
t = threading.Thread(target=get_page,args=(keyword,page_q))
t.setDaemon(True)
t.start()
page_q.join()
logger.info(u"共 %s 視頻" % q.qsize())
# 多線程下載
logger.info(u"開始下載視頻")
for x in range(threads):
t = threading.Thread(target=downlaod,args=(q,x,path))
t.setDaemon(True)
t.start()
q.join()
logger.info(u"全部視頻下載完成!")
main()






