Własny inteligentny dom #3 – oprogramowanie i światełko w tunelu

Poprzednio przygotowaliśmy sobie wszystko pod programowanie i przejście do części właściwej. W tym wpisie zajmiemy się oprogramowaniem, które da nam możliwość kierowania urządzeniami. 

Do stworzenia tego systemu użyjemy języka Python, który będzie odpowiedzialny za obsługę zapytań HTTP. Zbudujemy prosty serwer, który nie będzie miał za dużo zabezpieczeń, bo pokażę tylko sposób działania. Do tego ograniczony będzie tylko do sieci lokalnej.

Zaczynamy

Zacznijmy od potrzebnych bibliotek.

import time
import BaseHTTPServer
import json
import subprocess
  1. Biblioteka, która przyda się przy wypisywaniu w konsoli czasu wykonania zapytania.
  2. Obsługa protokołu HTTP.
  3. W formacie JSON zwracać będziemy status każdego z obsługiwanych pinów na RaspberryPi.
  4. Dzięki temu będziemy wykonywali komendy, które służą do obsługi pinów – tak, wiem, że jest biblioteka do obsługi tego, ale akurat tak było mi wygodniej.

Skoro mamy już biblioteki to nadszedł czas na deklarację najważniejszych rzeczy w projekcie. Nasze zmienne, będą przechowywały hostname, port, piny oraz operacje jakie są dostępne.

HOST_NAME = '10.0.10.4'
PORT_NUMBER = 5000
# [ID, PIN, ON/OFF, ACTIVE/DISACTIVE]
PINS = [
    [0, 8, False, True, 'White LED'],
    [1, 9, False, True, 'Color LED'],
    [2, 7, False, False, '---'],
    [3, 2, False, False, '---'],
    [4, 3, False, False, '---'],
    [5, 12, False, False, '---'],
    [6, 13, False, False, '---'],
    [7, 14, False, False, '---']
]

OPERATIONS = ['on', 'off', 'toggle', 'get', 'index']

Proste rozwiązanie, ale na ten moment musi nam wystarczyć. W trakcie rozwijania projektu stworzę konfigurator, w którym będzie można ustawić nazwę i sposób obsługi danego pinu.

Pozostawmy początek i przejdźmy dalej. Czas na obsługę zapytań. Stwórzmy klasę, która zajmie się wszystkim czego potrzebujemy.

class MyHandler(BaseHTTPServer.BaseHTTPRequestHandler):

    def do_HEAD(self):
        self.send_response(200)
        self.send_header("Content-type", "text/html")
        self.end_headers()

    def do_GET(self):
        """Respond to a GET request."""
        self.send_response(200)
        self.send_header("Content-type", "text/html")
        self.end_headers()

        operation = self.path[1:].split('/')
        url_arguments = len(operation)

        if url_arguments is not 0:
            if operation[0] in OPERATIONS:
                # ON
                if operation[0] == OPERATIONS[0]:
                    if url_arguments == 2:
                        PINS[int(operation[1])][2] = True
                        subprocess.check_output(['gpio', 'mode', str(PINS[int(operation[1])][1]), 'out'])
                        self.wfile.write("on-" + operation[1])
                    else:
                        self.wfile.write("error-nu")
                # OFF
                if operation[0] == OPERATIONS[1]:
                    if url_arguments == 2:
                        PINS[int(operation[1])][2] = False
                        subprocess.check_output(['gpio', 'mode', str(PINS[int(operation[1])][1]), 'in'])
                        self.wfile.write("off-" + operation[1])
                    else:
                        self.wfile.write("error-nu")
                # TOGGLE
                if operation[0] == OPERATIONS[2]:
                    if url_arguments == 2:
                        PINS[int(operation[1])][2] = not PINS[int(operation[1])][2]
                        tmp = ''
                        if PINS[int(operation[1])][2]:
                            tmp = 'out'
                        else:
                            tmp = 'in'

                        subprocess.check_output(['gpio', 'mode', str(PINS[int(operation[1])][1]), tmp])
                        self.wfile.write("toggle-" + operation[1])
                    else:
                        self.wfile.write("error-nu")
                # GET
                if operation[0] == OPERATIONS[3]:
                    if url_arguments == 1:
                        self.wfile.write(json.dumps(PINS))
                    elif url_arguments == 2:
                        self.wfile.write(json.dumps(PINS[int(operation[1])]))
                    else:
                        self.wfile.write("error-nu")
                # INDEX
                if operation[0] == OPERATIONS[4]:
                    with open('index.html', 'r') as myFile:
                        data = myFile.read().replace('\n', '')

                    passArr = {}
                    for pin in PINS:
                        passArr['NAME' + str(pin[0])] = str(pin[4])
                        if pin[3] is True:
                            if pin[2]:
                                passArr['PIN' + str(pin[0])] = 'on'
                            else:
                                passArr['PIN' + str(pin[0])] = 'off'
                    pyTE = PyTempNgin()
                    pyTE.load('index.html')
                    pyTE.render(passArr)
                    self.wfile.write(pyTE.getRenderedTemplate())
            else:
                self.wfile.write("error-op")
        else:
            self.wfile.write("error-arg")

    def do_POST(self):
        """Respond to a POST request."""
        self.send_response(200)
        self.send_header("Content-type", "text/html")
        self.end_headers()
        self.wfile.write("<html><head><title>SmartServer 0.0.1</title></head>")
        self.wfile.write("<body>")
        self.wfile.write("</body></html>")

Robi się poważnie? W zasadzie to cała logika znajduje się w metodzie do_GET, ponieważ obsługę pozostawiliśmy tylko jej. Spokojnie, to jest dopiero początek i wiem, że ustawianie pinów powinno być w do_POST, ale łatwiej będzie debugować działania i sprawdzać funkcjonalność na początku.

Co robi do_GET? Na początku ustawia nagłówek, zwraca kod 200, który dla przeglądarki oznacza, że wszystko poszło zgodnie z planem. Następnie przychodzący URL jest dzielony na argumentu, które znajdują się pomiędzy / w zapytaniu. Przykładowo…

localhost:5000/on/1

… z tego linku uzyskamy [‚on’, 1]. Kolejnym krokiem jest sprawdzenie czy uzyskaliśmy cokolwiek z adresu. Potem odpowiednio od pierwszego argumentu przechodzimy do operacji, która za niego odpowiada. Reszta powinna być już do rozczytania: zmiana wartości w tablicy PINS oraz faktyczna zmiana na szynie GPIO.

Kiedy już to wszystko zrobimy to musimy uruchomić serwer i sprawdzić jak to działa.

if __name__ == '__main__':
    server_class = BaseHTTPServer.HTTPServer
    httpd = server_class((HOST_NAME, PORT_NUMBER), MyHandler)
    print time.asctime(), "SmartServer Starts - %s:%s" % (HOST_NAME, PORT_NUMBER)
    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
        pass
    httpd.server_close()
    print time.asctime(), "Server Stops - %s:%s" % (HOST_NAME, PORT_NUMBER)

Uruchamiamy nasłuchiwanie i wykonujemy jedno z poniższych zapytań w przeglądarce na komputerze/telefonie.

http://raspberry:port/on/1
http://raspberry:port/off/1
http://raspberry:port/toggle/1
http://raspberry:port/get/1

Wszystko powinno działać jak należy. Jeśli nie działa to sprawdź numery pinów w PINS oraz czy wszystko jest dobrze podpięte. Proponuję również sklonowanie projektu z GitHub’a dzięki czemu nie powinno się nic wysypać. Pamiętajcie o zmianie portu oraz adresu host’a. U mnie wpisany jest przykładowy (10.0.10.4).

Podsumowanie

Skrypt ten jest dość prymitywny, ale do późniejszej obsługi będziemy używali mojej bardziej zaawansowanej implementacji. W przyszłym wpisie pokażę jak połączyć to z Google Assistant. Wiem, że była teraz dość długa przerwa, ale stwierdziłem, że potrzebuję trochę odpoczynku i grałem w Minecraft’a. Bawiłem się również w nagrywanie filmów, ale nie wiem czy coś z tego wyjdzie. Może kiedyś.

See ya!

Source: github.com/Pelski/PySmartSystem

#shareShare on FacebookShare on Google+Tweet about this on TwitterShare on TumblrPin on PinterestShare on LinkedInShare on VKShare on RedditEmail this to someone