summaryrefslogtreecommitdiff
path: root/scripts/p7s/bitwarden.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/p7s/bitwarden.py')
-rw-r--r--scripts/p7s/bitwarden.py61
1 files changed, 61 insertions, 0 deletions
diff --git a/scripts/p7s/bitwarden.py b/scripts/p7s/bitwarden.py
new file mode 100644
index 00000000..9ce4d44c
--- /dev/null
+++ b/scripts/p7s/bitwarden.py
@@ -0,0 +1,61 @@
+import contextlib
+import io
+import json
+import os
+import subprocess
+import zipfile
+
+import httpx
+
+from p7s import appdirs
+
+
+class Bitwarden():
+ def download(self, check_version=False):
+ self.bw_command = appdirs.user_cache_dir() / "bw"
+ if self.bw_command.exists() and not check_version:
+ return self.bw_command
+ r = httpx.get("https://vault.bitwarden.com/download/?app=cli&platform=linux")
+ location = r.headers["location"]
+ version = location.split("/")[7]
+ bw_versioned_command = appdirs.user_cache_dir() / f"bw-{version}"
+ if not bw_versioned_command.exists():
+ with zipfile.ZipFile(io.BytesIO(httpx.get(location, follow_redirects=True).content)) as zip:
+ with zip.open("bw") as zip_bw:
+ bw_versioned_command.write_bytes(zip_bw.read())
+ bw_versioned_command.chmod(0o755)
+ self.bw_command.unlink(missing_ok=True)
+ self.bw_command.symlink_to(bw_versioned_command)
+
+ @contextlib.contextmanager
+ def login(self, server, email):
+ subprocess.run([self.bw_command, "config", "server", server], check=True)
+ status = self.status()["status"]
+ if status == "unauthenticated":
+ command = ["login", email]
+ elif status == "locked":
+ command = ["unlock"]
+ else:
+ assert False, f"unexpected status {status}"
+ command = subprocess.run([self.bw_command] + command, check=True, stdout=subprocess.PIPE, encoding="UTF8")
+ export_line = command.stdout.splitlines()[3]
+ session = export_line.split('"')[1]
+ os.environ["BW_SESSION"] = session
+ try:
+ yield self.bw_command
+ subprocess.run([self.bw_command, "lock"], check=True)
+ finally:
+ del os.environ["BW_SESSION"]
+
+ def status(self):
+ return json.loads(subprocess.run([self.bw_command, "status"], check=True, stdout=subprocess.PIPE).stdout)
+
+ def get_item(self, uuid):
+ return json.loads(subprocess.run([self.bw_command, "get", "item", uuid], check=True, stdout=subprocess.PIPE).stdout)
+
+
+def get_item(server, email, uuid):
+ b = Bitwarden()
+ b.download()
+ with b.login(server, email):
+ return b.get_item(uuid)