summaryrefslogtreecommitdiff
path: root/scripts/p7s/bitwarden.py
blob: f3f26dd11f724f4e3a5f224698bf4ff82ebbbca1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import contextlib
import io
import json
import os
import subprocess
import zipfile

import httpx

from p7s import appdirs


class Bitwarden():
    def download(self, check_version=False):
        self.bw_command = appdirs.user_cache_dir() / "bw"
        if self.bw_command.exists() and not check_version:
            return self.bw_command
        r = httpx.get("https://vault.bitwarden.com/download/?app=cli&platform=linux")
        location = r.headers["location"]
        version = location.split("/")[7]
        bw_versioned_command = appdirs.user_cache_dir() / f"bw-{version}"
        if not bw_versioned_command.exists():
            with zipfile.ZipFile(io.BytesIO(httpx.get(location, follow_redirects=True).content)) as zip:
                with zip.open("bw") as zip_bw:
                    bw_versioned_command.write_bytes(zip_bw.read())
            bw_versioned_command.chmod(0o755)
        self.bw_command.unlink(missing_ok=True)
        self.bw_command.symlink_to(bw_versioned_command)

    @contextlib.contextmanager
    def login(self, server, email):
        subprocess.run([self.bw_command, "config", "server", server], check=True)
        status = self.status()["status"]
        if status == "unauthenticated":
            command = ["login", email]
        elif status == "locked":
            command = ["unlock"]
        else:
            assert False, f"unexpected status {status}"
        command = subprocess.run([self.bw_command] + command, check=True, stdout=subprocess.PIPE, encoding="UTF8")
        export_line = command.stdout.splitlines()[3]
        session = export_line.split('"')[1]
        os.environ["BW_SESSION"] = session
        try:
            yield self.bw_command
            subprocess.run([self.bw_command, "lock"], check=True)
        finally:
            del os.environ["BW_SESSION"]

    def sync(self):
        subprocess.run([self.bw_command, "sync"], check=True)

    def status(self):
        return json.loads(subprocess.run([self.bw_command, "status"], check=True, stdout=subprocess.PIPE).stdout)

    def get_item(self, uuid):
        return json.loads(subprocess.run([self.bw_command, "get", "item", uuid], check=True, stdout=subprocess.PIPE).stdout)


def get_item(server, email, uuid):
    b = Bitwarden()
    b.download()
    with b.login(server, email):
        b.sync()
        return b.get_item(uuid)