diff options
Diffstat (limited to 'scripts/p7s/bitwarden.py')
| -rw-r--r-- | scripts/p7s/bitwarden.py | 61 |
1 files changed, 61 insertions, 0 deletions
diff --git a/scripts/p7s/bitwarden.py b/scripts/p7s/bitwarden.py new file mode 100644 index 00000000..9ce4d44c --- /dev/null +++ b/scripts/p7s/bitwarden.py @@ -0,0 +1,61 @@ +import contextlib +import io +import json +import os +import subprocess +import zipfile + +import httpx + +from p7s import appdirs + + +class Bitwarden(): + def download(self, check_version=False): + self.bw_command = appdirs.user_cache_dir() / "bw" + if self.bw_command.exists() and not check_version: + return self.bw_command + r = httpx.get("https://vault.bitwarden.com/download/?app=cli&platform=linux") + location = r.headers["location"] + version = location.split("/")[7] + bw_versioned_command = appdirs.user_cache_dir() / f"bw-{version}" + if not bw_versioned_command.exists(): + with zipfile.ZipFile(io.BytesIO(httpx.get(location, follow_redirects=True).content)) as zip: + with zip.open("bw") as zip_bw: + bw_versioned_command.write_bytes(zip_bw.read()) + bw_versioned_command.chmod(0o755) + self.bw_command.unlink(missing_ok=True) + self.bw_command.symlink_to(bw_versioned_command) + + @contextlib.contextmanager + def login(self, server, email): + subprocess.run([self.bw_command, "config", "server", server], check=True) + status = self.status()["status"] + if status == "unauthenticated": + command = ["login", email] + elif status == "locked": + command = ["unlock"] + else: + assert False, f"unexpected status {status}" + command = subprocess.run([self.bw_command] + command, check=True, stdout=subprocess.PIPE, encoding="UTF8") + export_line = command.stdout.splitlines()[3] + session = export_line.split('"')[1] + os.environ["BW_SESSION"] = session + try: + yield self.bw_command + subprocess.run([self.bw_command, "lock"], check=True) + finally: + del os.environ["BW_SESSION"] + + def status(self): + return json.loads(subprocess.run([self.bw_command, "status"], check=True, stdout=subprocess.PIPE).stdout) + + def get_item(self, uuid): + return json.loads(subprocess.run([self.bw_command, "get", "item", uuid], check=True, stdout=subprocess.PIPE).stdout) + + +def get_item(server, email, uuid): + b = Bitwarden() + b.download() + with b.login(server, email): + return b.get_item(uuid) |
