in dev

Python e Flask: criando um webserver para executar scripts e trabalhar com webhooks

A maioria dos serviços de integração continua ou ferramentas de comunicação (Slack) dispões de webhooks para realizar a comunicação entre as diversas partes e orquestração dos processos.

A idea deste post é mostrar como é simples montar um servidor utilizando Python e Flask que receba alguns requests e execute alguns scripts, recentemente utilizei uma aplicação que foi feita pelo Wilson Tayar e pelo Vinicius Quaiato para automatizar o processo de deploy, a aplicação recebe um post quando qualquer mudança na master de um projeto no GitHub, a partir deste post, ela executa uma serie de scripts para realizar o deploy e notificar um grupo do slack que o deploy terminou.

Para este exemplo, vamos precisar instalar o Python e o pip (gerenciador de pacotes do Python), com eles funcionando em nosso computador podemos começar.

Em nossa aplicação, vamos precisar de um modulo chamado Flask (que é o framework responsável por criar o webserver, trabalhar o roteamento e as requisições), para isso vamos no terminal e instalar usando pip:

pip install flask

Outro modulo que vamos precisar é o Requests, modulo que utilizaremos para notificar um webhook (por exemplo do Slack) para avisar que o deploy foi finalizado.

pip install requests

Agora podemos ver a nossa aplicação, ela consiste em 3 métodos principais:

1) index, recebe uma request em /deploy utilizando o verbo HTTP POST:

@app.route("/deploy", methods=['POST'])
def index():
    execute_script("deploy-exemplo-em-powershell")
    notify("deploy das realizado")
    return ""

2) execute_script, executa um script em powershell para realizar a tarefa (no meu GitHub tem alguns exemplos).

def execute_script(script):
    cmd = ["powershell","-ExecutionPolicy", "Bypass", "C:\\deploy\\{0}.ps1".format(script)]

    p = subprocess.Popen(cmd, stdout = subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
    out,err = p.communicate()

    if(err):
        raise Exception('Error: ' + str(err))

    return out

3) notify, será responsável para mandar um post para um webhook (exemplo do slack) que notifica alguma aplicação ou serviço que o deploy foi finalizado.

def notify(message):
    payload = {'text' : message}
    r = requests.post(payload_url, data=json.dumps(payload))

A versão final da app ficou da seguinte maneira:

from flask import Flask
from flask import request
import requests
import subprocess
import json
import time

payload_url = 'https://url-que-notificaremos-quando-o-deploy-terminar/'
app = Flask(__name__)

@app.route("/deploy", methods=['POST'])
def index():
    execute_script("deploy-exemplo-em-powershell")
    notify("deploy realizado")
    return ""

def notify(message):
    payload = {'text' : message}
    r = requests.post(payload_url, data=json.dumps(payload))

def execute_script(script):
    cmd = ["powershell","-ExecutionPolicy", "Bypass", "C:\\deploy\\{0}.ps1".format(script)]

    p = subprocess.Popen(cmd, stdout = subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
    out,err = p.communicate()

    if(err):
        raise Exception('Error: ' + str(err))

    return out

if __name__ == "__main__":
    app.run(port=7500, host='0.0.0.0', debug=True, threaded=True)

Ao executar ela, temos nosso servidor ouvindo na porta 7500, que ao receber um post em /deploy executa um powershell e notifica uma url que o deploy foi finalizado:

$ python app_deploy.py
* Running on http://0.0.0.0:7500/ (Press CTRL+C to quit)
* Restarting with stat

Bom espero que este post seja útil, lembrando que boa parte (quase toda a aplicação que utilizei foi um exemplo com base no que Wilson e o Quaiato fizeram) mas acaba sendo um código muito util para automatizar tarefas 🙂

abs

Rodolfo