1
Fork 0

Write most of the logic

RIP committing as you go.

This seems to have all the issue functionality of `actioner`, but in a much simpler and efficient way
This commit is contained in:
Jake Howard 2020-01-08 22:36:17 +00:00
parent b8eb2645e7
commit 004b4e3952
Signed by: jake
GPG key ID: 57AFB45680EDD477
4 changed files with 85 additions and 8 deletions

View file

@ -1,2 +1,3 @@
python-dateutil==2.8.1
todoist-python==8.1.1 todoist-python==8.1.1
urlextract==0.14.0 urlextract==0.14.0

View file

@ -1,21 +1,71 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from .clients import todoist import datetime
from .utils import get_github_task
from dateutil.relativedelta import relativedelta
from .clients import github, todoist
from .utils import get_github_issue_details, get_github_task, get_issue
def get_issue_link(issue_or_pr) -> str:
return "[#{id}]({url})".format(id=issue_or_pr.number, url=issue_or_pr.html_url)
def issue_to_task_name(issue) -> str:
return get_issue_link(issue) + ": " + issue.title
def get_relevant_todoist_tasks(): def get_relevant_todoist_tasks():
todoist.items.sync() todoist.items.sync()
tasks = {} tasks = {}
for task in todoist.items.all(): for task in todoist.items.all():
if get_github_task(task["content"]): github_task = get_github_task(task["content"])
tasks[task['content']] = task if github_task:
tasks[github_task] = task
return tasks return tasks
def is_task_completed(task):
return task.data.get("checked", 0)
def main(): def main():
todoist_tasks = get_relevant_todoist_tasks() todoist_tasks = get_relevant_todoist_tasks()
relevant_since = datetime.datetime.now() - relativedelta(
weeks=30
) # TODO: Make this a sane number
tasks_actioned = []
me = github.get_user()
for assigned_issue in me.get_issues(state="all", since=relevant_since):
task = todoist_tasks.get(assigned_issue.html_url)
if not task and assigned_issue.state == "open":
task = todoist.items.add(issue_to_task_name(assigned_issue))
if not task:
continue
tasks_actioned.append(task["id"])
if assigned_issue == "closed" and not is_task_completed(task):
print("completing", assigned_issue)
task.complete()
if is_task_completed(task):
print("uncompleting task", assigned_issue)
task.uncomplete()
if task["content"] != issue_to_task_name(assigned_issue):
print("updating issue name for", assigned_issue)
task.update(content=issue_to_task_name(assigned_issue))
if assigned_issue.milestone and assigned_issue.milestone.due_on:
task.update(
date_string=assigned_issue.milestone.due_on.strftime("%d/%m/%Y")
)
for task in todoist_tasks.values():
if not is_task_completed(task) or task["id"] in tasks_actioned:
continue
org, repo, issue_number = get_github_issue_details(task["content"])
issue = get_issue(me, org, repo, issue_number)
me_assigned = me.login in {assignee.login for assignee in issue.assignees}
if not me_assigned:
print("Deleting", issue)
task.delete()
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -1,4 +1,7 @@
from todoist import TodoistAPI
import os import os
from github import Github
from todoist import TodoistAPI
todoist = TodoistAPI(os.environ["TODOIST_TOKEN"]) todoist = TodoistAPI(os.environ["TODOIST_TOKEN"])
github = Github(os.environ["GITHUB_TOKEN"])

View file

@ -1,7 +1,11 @@
import re
from typing import Optional from typing import Optional
from urlextract import URLExtract
from urllib.parse import urlparse from urllib.parse import urlparse
from github.Issue import Issue
from urlextract import URLExtract
GITHUB_ISSUE_PR_RE = re.compile(r"\/(.+?)\/(.+?)\/(pull|issues)\/(\d+?)$")
extractor = URLExtract() extractor = URLExtract()
@ -10,5 +14,24 @@ def get_github_task(content) -> Optional[str]:
if "github" not in content.lower(): if "github" not in content.lower():
return None return None
for url in extractor.gen_urls(content): for url in extractor.gen_urls(content):
if urlparse(url).netloc == "github.com": parsed_url = urlparse(url)
if parsed_url.netloc == "github.com" and GITHUB_ISSUE_PR_RE.search(
parsed_url.path
):
return url return url
def get_github_issue_details(content):
url = get_github_task(content)
if not url:
return
parsed_url = urlparse(url)
match = GITHUB_ISSUE_PR_RE.search(parsed_url.path)
return match.group(1), match.group(2), match.group(4)
def get_issue(me, org, repo, issue_num):
headers, data = me._requester.requestJsonAndCheck(
"GET", f"/repos/{org}/{repo}/issues/{issue_num}"
)
return Issue(me._requester, headers, data, completed=True)