"""定义hook的执行
WebHook:
def _if_valid_source(self):
验证是否合法来源的webhook
可以post一个自定义的key,{"sec_code":"my_password"},然后验证这个key是否符合
github的webhook按照github的签名方式来计算:header里有一个'X-Hub-Signature',
然后对payload用预先在github上预留的code进行签名,二者相符则签名通过
自定义的webhook,就直接在post发送的json里加一个{"sec_code":"..."}来验证
def set_fields(self):
设定需要写入的字段,根据webhook的来源不同,这里写入的数据由自已定义并处理
"""
from share.env_conf import WebhookConfig
from .models import WebhookLog
import subprocess
import json
from abc import ABC, abstractmethod
import hmac, hashlib
[docs]class WebhookSourceNotValid(Exception):
pass
[docs]class WebHook(ABC):
"""接受一个请求,然后将必须的几个属性赋值
log = WebHook()
<...验证是否允许的来源...> self.verified = True
log.set_log_fields() 从self.data_dict里取值
log.from_site = ... 无法set_log_fields的项,手动赋值一遍
log.shell_script = '<执行的命令>'
log.save_log() :这样就执行脚本并且保存日志
必须的属性包括:
from_site: 需要知道这个请求来自于何处
shell_script: 需要知道收到请求后去执行哪个脚本
verified: 需要先赋值为True才会保存;验证需要自行处理,因为每个webhook的验证方式不一样
可以考虑post过来的json包括一个{"sec": sec_code},然后每次验证它
Arguments:
object {[type]} -- [description]
Returns:
[type] -- [description]
"""
def __init__(self, request, sec_code=None):
self.sec_code = sec_code
self.request = request
self.shell_script = ''
self.data_dict = self.convert_data_to_dict()
self.webhooklog = self.gen_webhooklog()
self._empty_dict = {"no_key": "no_value"}
self.verified = self._if_valid_source()
@abstractmethod
def _if_valid_source(self):
raise Exception("需要实现验证,确认是否合法来源")
[docs] @abstractmethod
def set_fields(self):
raise Exception('需要给webhooklog的字段赋值')
[docs] def convert_data_to_dict(self):
"""处理get或post请求,获取有用的数据
TODO: 要考虑各种异常
"""
if self.request.method == 'GET':
data = dict(self.request.GET)
return data
elif self.request.method == 'POST':
try:
data = json.loads(self.request.body)
return data
except Exception as e:
print(f"从request.body转data_dict错误---> {e}")
else:
pass
return None
[docs] def gen_webhooklog(self):
"""获取日志各字段的值"""
webhooklog = WebhookLog()
return webhooklog
[docs] def save_log(self):
print(f'verified ===> {self.verified}')
if self.verified:
self.set_fields()
output = self._run_shell_script()
self.webhooklog.save()
return output
return None
def _run_shell_script(self):
try:
output = subprocess.check_output([self.shell_script, ])
return output
except:
print(f'script run error ---> {self.shell_script}')
[docs]class GithubHook(WebHook):
[docs] def set_fields(self):
self.webhooklog.from_site = 'Webhook Project - Github.com'
self.webhooklog.ref = self.data_dict.get("ref", '')
self.webhooklog.before = self.data_dict.get("before", '')
self.webhooklog.after = self.data_dict.get("after", '')
repository = self.data_dict.get("repository", self._empty_dict)
head_commit = self.data_dict.get("head_commit", self._empty_dict)
self.webhooklog.repo_name = repository.get("full_name", '')
self.webhooklog.html_url = repository.get("html_url", '')
self.webhooklog.hooks_url = repository.get("hooks_url", '')
self.webhooklog.commit_message = head_commit.get("message", '')
print(f'---> set_fields done')
def _if_valid_source(self):
sign_from_github = self.request.headers.get('X-Hub-Signature').split('=')[1]
raw = self.request.body
key = self.sec_code.encode('utf-8')
hashed = hmac.new(key, raw, hashlib.sha1)
sign = hashed.hexdigest()
print(f'github sign: {sign_from_github} / local check sign: {sign}')
# 给测试用的sec_code,从环境变量获取
test_sign = str(self.data_dict.get('sec_code', ''))
if sign_from_github == sign or test_sign == self.sec_code:
return True
return False
[docs]class HhxxGitHook(WebHook):
"""用于接受本地git server 在post-receive发过来的请求
:curl -H "Content-Type:application/json" -X POST -d '{"sec_code":""}' <http://site/...>
| post-receive hook script:
| 用git log分别取出当前的head和上一次的head,再取出当前的commt message,转成json发出去
| #!/bin/sh
| # for post commit
| # exec git update-server-info
|
| AFTER=$(git log --pretty=%h |head -1)
| BEFORE=$(git log --pretty=%h |head -2|tail -1)
| COMMIT_MESSAGE=$(git log --pretty=%s |head -1)
| tmp='{"sec_code":"xxxxxx","after":'\"$AFTER\"',"before":'\"$BEFORE\"',"commit_message":'\"$COMMIT_MESSAGE\"'}'
| echo $tmp | curl -i -H "Content-Type:application/json" -X POST -d @- webhook.hhxx.me/webhook/hhxx/
"""
[docs] def set_fields(self):
self.webhooklog.from_site = "hhxx - local git server"
#
self.webhooklog.before = self.data_dict.get("before", '')
self.webhooklog.after = self.data_dict.get("after", '')
self.webhooklog.commit_message = self.data_dict.get("commit_message", '')
# self.webhooklog.commit_message = head_commit.get("message", '')
# self.webhooklog.ref = self.data_dict.get("ref", '')
# repository = self.data_dict.get("repository", self._empty_dict)
# head_commit = self.data_dict.get("head_commit", self._empty_dict)
# self.webhooklog.repo_name = repository.get("full_name", '')
# self.webhooklog.html_url = repository.get("html_url", '')
# self.webhooklog.hooks_url = repository.get("hooks_url", '')
#
print(self.data_dict)
print(self.data_dict.get("extra_info"))
self.webhooklog.extra_info = self.data_dict.get("extra_info", '')
def _if_valid_source(self):
received_code = str(self.data_dict.get('sec_code', ''))
if received_code == self.sec_code:
return True
return False
[docs]class GithubHookFeiProject(WebHook):
[docs] def set_fields(self):
self.webhooklog.from_site = 'Fei Project - Github.com'
self.webhooklog.ref = self.data_dict.get("ref", '')
self.webhooklog.before = self.data_dict.get("before", '')
self.webhooklog.after = self.data_dict.get("after", '')
repository = self.data_dict.get("repository", self._empty_dict)
head_commit = self.data_dict.get("head_commit", self._empty_dict)
self.webhooklog.repo_name = repository.get("full_name", '')
self.webhooklog.html_url = repository.get("html_url", '')
self.webhooklog.hooks_url = repository.get("hooks_url", '')
self.webhooklog.commit_message = head_commit.get("message", '')
print(f'---> set_fields done')
def _if_valid_source(self):
sign_from_github = self.request.headers.get('X-Hub-Signature').split('=')[1]
raw = self.request.body
key = self.sec_code.encode('utf-8')
hashed = hmac.new(key, raw, hashlib.sha1)
sign = hashed.hexdigest()
print(f'github sign: {sign_from_github} / local check sign: {sign}')
# 给测试用的sec_code,从环境变量获取
test_sign = str(self.data_dict.get('sec_code', ''))
if sign_from_github == sign or test_sign == self.sec_code:
return True
return False