1 Star 0 Fork 0

CNCF/devstatscode

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
ghapi.go 56.81 KB
一键复制 编辑 原始数据 按行查看 历史
Łukasz Gryglicki 提交于 2023-12-24 16:01 +08:00 . Support variable threads counts
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109
package devstatscode
import (
"context"
"database/sql"
"fmt"
"os"
"sort"
"strings"
"sync"
"time"
"github.com/google/go-github/v38/github"
"golang.org/x/oauth2"
)
// IssueConfig - holds issue data
type IssueConfig struct {
Repo string
Number int
IssueID int64
Pr bool
MilestoneID *int64
Labels string
LabelsMap map[int64]string
GhIssue *github.Issue
CreatedAt time.Time
EventID int64
EventType string
GhEvent *github.IssueEvent
AssigneeID *int64
Assignees string
AssigneesMap map[int64]string
}
func (ic IssueConfig) String() string {
var (
milestoneID int64
assigneeID int64
)
if ic.MilestoneID != nil {
milestoneID = *ic.MilestoneID
}
if ic.AssigneeID != nil {
assigneeID = *ic.AssigneeID
}
return fmt.Sprintf(
"{Repo: %s, Number: %d, IssueID: %d, EventID: %d, EventType: %s, Pr: %v, MilestoneID: %d, AssigneeID: %d, CreatedAt: %s, Labels: %s, LabelsMap: %+v, Assignees: %s, AssigneesMap: %+v}",
ic.Repo,
ic.Number,
ic.IssueID,
ic.EventID,
ic.EventType,
ic.Pr,
milestoneID,
assigneeID,
ToYMDHMSDate(ic.CreatedAt),
ic.Labels,
ic.LabelsMap,
ic.Assignees,
ic.AssigneesMap,
)
}
func (ic IssueConfig) configStr() string {
var (
milestoneID int64
assigneeID int64
)
if ic.MilestoneID != nil {
milestoneID = *ic.MilestoneID
}
if ic.AssigneeID != nil {
assigneeID = *ic.AssigneeID
}
return fmt.Sprintf(
"{Repo: %s, Number: %d, IssueID: %d, MilestoneID: %d, AssigneeID: %d, Labels: %s, Assignees: %s}",
ic.Repo,
ic.Number,
ic.IssueID,
milestoneID,
assigneeID,
ic.Labels,
ic.Assignees,
)
}
// outputIssuesInfo: display summary of issues data to process
func outputIssuesInfo(issues map[int64]IssueConfigAry, info string) {
Printf("%s:\n", info)
eids := make(map[int64][2]int64)
data := make(map[string][]string)
for _, cfgAry := range issues {
for _, cfg := range cfgAry {
eid := cfg.EventID
_, o := eids[eid]
if o {
eids[eid] = [2]int64{*cfg.GhIssue.ID, eids[eid][1] + 1}
} else {
eids[eid] = [2]int64{*cfg.GhIssue.ID, 1}
}
key := fmt.Sprintf("%s %d", cfg.Repo, cfg.Number)
val := fmt.Sprintf("%s %s", ToYMDHMSDate(cfg.CreatedAt), cfg.EventType)
_, ok := data[key]
if ok {
data[key] = append(data[key], val)
} else {
data[key] = []string{val}
}
}
}
keys := []string{}
for key := range data {
keys = append(keys, key)
}
sort.Strings(keys)
for _, key := range keys {
values := data[key]
svalues := []string{}
for _, value := range values {
svalues = append(svalues, value)
}
sort.Strings(svalues)
Printf("%s: [%s]\n", key, strings.Join(svalues, ", "))
}
for eid, na := range eids {
if na[1] > 1 {
Printf("Warning: Duplicate event %d(%d): %v\n", eid, na[1], issues[na[0]])
}
}
for _, cfgAry := range issues {
l := len(cfgAry)
for i := 0; i < l; i++ {
for j := i + 1; j < l; j++ {
stateA := cfgAry[i].configStr()
stateB := cfgAry[j].configStr()
if stateA != stateB {
Printf("StateA: %v\n", stateA)
Printf("StateB: %v\n\n", stateB)
}
}
}
}
}
// outputPRsInfo: display summary of PRs data to process
func outputPRsInfo(prs map[int64]github.PullRequest, info string) {
Printf("%s:\n", info)
infos := []string{}
for prid, pr := range prs {
if pr.Number != nil && pr.Base != nil && pr.Base.Repo != nil && pr.Base.Repo.FullName != nil {
infos = append(infos, fmt.Sprintf("%s %d", *pr.Base.Repo.FullName, *pr.Number))
} else {
infos = append(infos, fmt.Sprintf("<%d>", prid))
}
}
sort.Strings(infos)
Printf("PRs: %s\n", strings.Join(infos, ", "))
}
// outputInfo: displays messages gathered in the map
func outputInfo(infos map[string][]string, info string) {
Printf("%s:\n", info)
keys := []string{}
for key := range infos {
keys = append(keys, key)
}
sort.Strings(keys)
for _, key := range keys {
msgs := infos[key]
sort.Strings(msgs)
Printf("%s:\n\t%s\n", key, strings.Join(msgs, "\n\t"))
}
}
// IssueConfigAry - allows sorting IssueConfig array by IssueID annd then event creation date
type IssueConfigAry []IssueConfig
func (ic IssueConfigAry) Len() int { return len(ic) }
func (ic IssueConfigAry) Swap(i, j int) { ic[i], ic[j] = ic[j], ic[i] }
func (ic IssueConfigAry) Less(i, j int) bool {
if ic[i].IssueID != ic[j].IssueID {
return ic[i].IssueID < ic[j].IssueID
}
if ic[i].CreatedAt != ic[j].CreatedAt {
return ic[i].CreatedAt.Before(ic[j].CreatedAt)
}
return ic[i].EventID < ic[j].EventID
}
// GetRateLimits - returns all and remaining API points and duration to wait for reset
// when core=true - returns Core limits, when core=false returns Search limits
func GetRateLimits(gctx context.Context, ctx *Ctx, gcs []*github.Client, core bool) (int, []int, []int, []time.Duration) {
var (
limits []int
remainings []int
durations []time.Duration
)
for idx, gc := range gcs {
rl, _, err := gc.RateLimits(gctx)
if err != nil {
rem, ok := PeriodParse(err.Error())
if ok {
Printf("Parsed wait time from error message: %v\n", rem)
limits = append(limits, -1)
remainings = append(remainings, -1)
durations = append(durations, rem)
continue
}
Printf("GetRateLimit(%d): %v\n", idx, err)
}
if rl == nil {
limits = append(limits, -1)
remainings = append(remainings, -1)
durations = append(durations, time.Duration(5)*time.Second)
continue
}
if core {
limits = append(limits, rl.Core.Limit)
remainings = append(remainings, rl.Core.Remaining)
durations = append(durations, rl.Core.Reset.Time.Sub(time.Now())+time.Duration(1)*time.Second)
continue
}
limits = append(limits, rl.Search.Limit)
remainings = append(remainings, rl.Search.Remaining)
durations = append(durations, rl.Search.Reset.Time.Sub(time.Now())+time.Duration(1)*time.Second)
}
hint := 0
for idx := range limits {
if remainings[idx] > remainings[hint] {
hint = idx
} else if idx != hint && remainings[idx] == remainings[hint] && durations[idx] < durations[hint] {
hint = idx
}
}
if ctx.GitHubDebug > 0 {
Printf("GetRateLimits: hint: %d, limits: %+v, remaining: %+v, reset: %+v\n", hint, limits, remainings, durations)
}
return hint, limits, remainings, durations
}
// GHClient - get GitHub client
func GHClient(ctx *Ctx) (ghCtx context.Context, clients []*github.Client) {
// Get GitHub OAuth from env or from file
oAuth := ctx.GitHubOAuth
if strings.Contains(ctx.GitHubOAuth, "/") {
bytes, err := ReadFile(ctx, ctx.GitHubOAuth)
FatalOnError(err)
oAuth = strings.TrimSpace(string(bytes))
}
// GitHub authentication or use public access
ghCtx = context.Background()
if oAuth == "-" {
client := github.NewClient(nil)
clients = append(clients, client)
} else {
oAuths := strings.Split(oAuth, ",")
for _, auth := range oAuths {
ts := oauth2.StaticTokenSource(
&oauth2.Token{AccessToken: auth},
)
tc := oauth2.NewClient(ghCtx, ts)
client := github.NewClient(tc)
clients = append(clients, client)
}
}
return
}
// HandlePossibleError - display error specific message, detect rate limit and abuse
func HandlePossibleError(err error, cfg, info string) string {
if err != nil {
_, rate := err.(*github.RateLimitError)
_, abuse := err.(*github.AbuseRateLimitError)
if abuse || rate {
if rate {
Printf("Rate limit (%s) for %v\n", info, cfg)
return "rate"
}
if abuse {
Printf("Abuse detected (%s) for %v\n", info, cfg)
return Abuse
}
}
errStr := err.Error()
if strings.Contains(errStr, "410 This issue was deleted") {
Printf("Issue was deleted (%s) for %v: %v\n", info, cfg, err)
return IssueIsDeleted
} else if strings.Contains(errStr, "404 Not Found") {
Printf("Not found (%s) for %v: %v\n", info, cfg, err)
return NotFound
} else if strings.Contains(errStr, "502 Server Error") {
Printf("Server Error (%s) for %v: %v\n", info, cfg, err)
return "server_error"
} else if strings.Contains(errStr, "409 Git Repository is empty") {
Printf("Git repository empty (%s) for %v: %v\n", info, cfg, err)
return NotFound
} else if strings.Contains(errStr, "301") {
Printf("Moved Permanently (%s) for %v: %v\n", info, cfg, err)
return MovedPermanently
}
//FatalOnError(err)
Printf("%s error: %T:%v, non fatal, exiting 0 status\n", os.Args[0], err, err)
os.Exit(0)
}
return ""
}
func ghActorIDOrNil(actPtr *github.User) interface{} {
if actPtr == nil {
return nil
}
return actPtr.ID
}
func ghActorLoginOrNil(actPtr *github.User, maybeHide func(string) string) interface{} {
if actPtr == nil {
return nil
}
if actPtr.Login == nil {
return nil
}
return maybeHide(*actPtr.Login)
}
func ghMilestoneIDOrNil(milPtr *github.Milestone) interface{} {
if milPtr == nil {
return nil
}
return milPtr.ID
}
// Inserts single GitHub User
func ghActor(con *sql.Tx, ctx *Ctx, actor *github.User, maybeHide func(string) string) {
if actor == nil || actor.Login == nil {
return
}
ExecSQLTxWithErr(
con,
ctx,
InsertIgnore("into gha_actors(id, login, name) "+NValues(3)),
AnyArray{actor.ID, maybeHide(*actor.Login), ""}...,
)
}
// Insert single GitHub milestone
func ghMilestone(con *sql.Tx, ctx *Ctx, eid int64, ic *IssueConfig, maybeHide func(string) string) {
milestone := ic.GhIssue.Milestone
ev := ic.GhEvent
// gha_milestones
ExecSQLTxWithErr(
con,
ctx,
InsertIgnore(
fmt.Sprintf(
"into gha_milestones("+
"id, event_id, closed_at, closed_issues, created_at, creator_id, "+
"description, due_on, number, open_issues, state, title, updated_at, "+
"dup_actor_id, dup_actor_login, dup_repo_id, dup_repo_name, dup_type, dup_created_at, "+
"dupn_creator_login) values("+
"%s, %s, %s, %s, %s, %s, "+
"%s, %s, %s, %s, %s, %s, %s, "+
"%s, %s, (select coalesce(max(repo_id), -1) from gha_events where dup_repo_name = %s), %s, %s, %s, "+
"%s)",
NValue(1),
NValue(2),
NValue(3),
NValue(4),
NValue(5),
NValue(6),
NValue(7),
NValue(8),
NValue(9),
NValue(10),
NValue(11),
NValue(12),
NValue(13),
NValue(14),
NValue(15),
NValue(16),
NValue(17),
NValue(18),
NValue(19),
NValue(20),
),
),
AnyArray{
ic.MilestoneID,
eid,
milestone.ClosedAt,
milestone.ClosedIssues,
milestone.CreatedAt,
ghActorIDOrNil(milestone.Creator),
TruncStringOrNil(milestone.Description, 0xffff),
milestone.DueOn,
milestone.Number,
milestone.OpenIssues,
milestone.State,
TruncStringOrNil(milestone.Title, 200),
milestone.UpdatedAt,
ev.Actor.ID,
maybeHide(*ev.Actor.Login),
ic.Repo,
ic.Repo,
ic.EventType,
ic.CreatedAt,
ghActorLoginOrNil(milestone.Creator, maybeHide),
}...,
)
}
// GetRecentRepos - get list of repos active last day
func GetRecentRepos(c *sql.DB, ctx *Ctx, dtFrom time.Time) (repos []string, rids []int64) {
rows := QuerySQLWithErr(
c,
ctx,
fmt.Sprintf(
"select distinct repo_id, dup_repo_name from gha_events "+
"where created_at > %s",
NValue(1),
),
dtFrom,
)
defer func() { FatalOnError(rows.Close()) }()
var (
repo string
rid int64
)
for rows.Next() {
FatalOnError(rows.Scan(&rid, &repo))
repos = append(repos, repo)
rids = append(rids, rid)
}
FatalOnError(rows.Err())
return
}
// DeleteArtificialPREvent - create artificial API event (but from the past)
func DeleteArtificialPREvent(c *sql.DB, ctx *Ctx, cfg *IssueConfig) (err error) {
if ctx.SkipPDB {
if ctx.Debug > 0 {
Printf("No DB write: Delete PR '%v'\n", *cfg)
}
return nil
}
eid := 281474976710656 + cfg.EventID
condition := fmt.Sprintf(" where event_id = %d", eid)
deletes := []string{
"delete from gha_pull_requests" + condition,
"delete from gha_pull_requests_assignees" + condition,
"delete from gha_pull_requests_requested_reviewers" + condition,
}
// Start transaction
tc, err := c.Begin()
FatalOnError(err)
for _, del := range deletes {
ExecSQLTxWithErr(tc, ctx, del)
}
// Final commit
FatalOnError(tc.Commit())
//FatalOnError(tc.Rollback())
return
}
// ArtificialPREvent - create artificial API event (PR state for now())
func ArtificialPREvent(c *sql.DB, ctx *Ctx, cfg *IssueConfig, pr *github.PullRequest) (err error) {
if ctx.SkipPDB {
if ctx.Debug > 0 {
Printf("No DB write: PR '%v'\n", *cfg)
}
return nil
}
// To handle GDPR
maybeHide := MaybeHideFunc(GetHidden(ctx, HideCfgFile))
eventID := 281474976710656 + cfg.EventID
eType := cfg.EventType
eCreatedAt := cfg.CreatedAt
event := cfg.GhEvent
issue := cfg.GhIssue
iid := *issue.ID
actor := event.Actor
// Start transaction
tc, err := c.Begin()
FatalOnError(err)
// User
ghActor(tc, ctx, pr.User, maybeHide)
baseSHA := ""
headSHA := ""
if pr.Base != nil && pr.Base.SHA != nil {
baseSHA = *pr.Base.SHA
}
if pr.Head != nil && pr.Head.SHA != nil {
headSHA = *pr.Head.SHA
}
if pr.MergedBy != nil {
ghActor(tc, ctx, pr.MergedBy, maybeHide)
}
if pr.Assignee != nil {
ghActor(tc, ctx, pr.Assignee, maybeHide)
}
if pr.Milestone != nil {
ghMilestone(tc, ctx, eventID, cfg, maybeHide)
}
prid := *pr.ID
ExecSQLTxWithErr(
tc,
ctx,
InsertIgnore(
fmt.Sprintf(
"into gha_pull_requests("+
"id, event_id, user_id, base_sha, head_sha, merged_by_id, assignee_id, milestone_id, "+
"number, state, title, body, created_at, updated_at, closed_at, merged_at, "+
"merge_commit_sha, merged, mergeable, mergeable_state, comments, "+
"maintainer_can_modify, commits, additions, deletions, changed_files, "+
"dup_actor_id, dup_actor_login, dup_repo_id, dup_repo_name, dup_type, dup_created_at, "+
"dup_user_login, dupn_assignee_login, dupn_merged_by_login) values("+
"%s, %s, %s, %s, %s, %s, %s, %s, "+
"%s, %s, %s, %s, %s, %s, %s, %s, "+
"%s, %s, %s, %s, %s, "+
"%s, %s, %s, %s, %s, "+
"%s, %s, (select coalesce(max(repo_id), -1) from gha_events where dup_repo_name = %s), %s, %s, %s, "+
"%s, %s, %s)",
NValue(1),
NValue(2),
NValue(3),
NValue(4),
NValue(5),
NValue(6),
NValue(7),
NValue(8),
NValue(9),
NValue(10),
NValue(11),
NValue(12),
NValue(13),
NValue(14),
NValue(15),
NValue(16),
NValue(17),
NValue(18),
NValue(19),
NValue(20),
NValue(21),
NValue(22),
NValue(23),
NValue(24),
NValue(25),
NValue(26),
NValue(27),
NValue(28),
NValue(29),
NValue(30),
NValue(31),
NValue(32),
NValue(33),
NValue(34),
NValue(35),
),
),
AnyArray{
prid,
eventID,
ghActorIDOrNil(pr.User),
baseSHA,
headSHA,
ghActorIDOrNil(pr.MergedBy),
ghActorIDOrNil(pr.Assignee),
ghMilestoneIDOrNil(pr.Milestone),
pr.Number,
pr.State,
pr.Title,
TruncStringOrNil(pr.Body, 0xffff),
pr.CreatedAt,
pr.UpdatedAt,
TimeOrNil(pr.ClosedAt),
TimeOrNil(pr.MergedAt),
StringOrNil(pr.MergeCommitSHA),
BoolOrNil(pr.Merged),
BoolOrNil(pr.Mergeable),
StringOrNil(pr.MergeableState),
IntOrNil(pr.Comments),
BoolOrNil(pr.MaintainerCanModify),
IntOrNil(pr.Commits),
IntOrNil(pr.Additions),
IntOrNil(pr.Deletions),
IntOrNil(pr.ChangedFiles),
actor.ID,
ghActorLoginOrNil(actor, maybeHide),
cfg.Repo,
cfg.Repo,
eType,
eCreatedAt,
ghActorLoginOrNil(pr.User, maybeHide),
ghActorLoginOrNil(pr.Assignee, maybeHide),
ghActorLoginOrNil(pr.MergedBy, maybeHide),
}...,
)
// Create artificial event
ExecSQLTxWithErr(
tc,
ctx,
InsertIgnore(
fmt.Sprintf(
"into gha_events("+
"id, type, actor_id, repo_id, public, created_at, "+
"dup_actor_login, dup_repo_name, org_id, forkee_id) "+
"values(%s, %s, %s, (select coalesce(max(repo_id), -1) from gha_events where dup_repo_name = %s), true, %s, "+
"%s, %s, (select max(org_id) from gha_events where dup_repo_name = %s), null)",
NValue(1),
NValue(2),
NValue(3),
NValue(4),
NValue(5),
NValue(6),
NValue(7),
NValue(8),
),
),
AnyArray{
eventID,
cfg.EventType,
ghActorIDOrNil(event.Actor),
cfg.Repo,
eCreatedAt,
ghActorLoginOrNil(event.Actor, maybeHide),
cfg.Repo,
cfg.Repo,
}...,
)
// Create artificial event's payload
ExecSQLTxWithErr(
tc,
ctx,
InsertIgnore(
fmt.Sprintf(
"into gha_payloads("+
"event_id, push_id, size, ref, head, befor, action, "+
"issue_id, pull_request_id, comment_id, ref_type, master_branch, commit, "+
"description, number, forkee_id, release_id, member_id, "+
"dup_actor_id, dup_actor_login, dup_repo_id, dup_repo_name, dup_type, dup_created_at) "+
"values(%s, null, null, null, null, null, %s, "+
"%s, %s, null, null, null, null, "+
"null, %s, null, null, null, "+
"%s, %s, (select coalesce(max(repo_id), -1) from gha_events where dup_repo_name = %s), %s, %s, %s)",
NValue(1),
NValue(2),
NValue(3),
NValue(4),
NValue(5),
NValue(6),
NValue(7),
NValue(8),
NValue(9),
NValue(10),
NValue(11),
),
),
AnyArray{
eventID,
cfg.EventType,
iid,
prid,
issue.Number,
ghActorIDOrNil(event.Actor),
ghActorLoginOrNil(event.Actor, maybeHide),
cfg.Repo,
cfg.Repo,
cfg.EventType,
eCreatedAt,
}...,
)
// If such payload already existed, we need to set PR ID on it
ExecSQLTxWithErr(
tc,
ctx,
fmt.Sprintf(
"update gha_payloads set pull_request_id = %s where issue_id = %s and event_id = %s",
NValue(1),
NValue(2),
NValue(3),
),
AnyArray{prid, iid, eventID}...,
)
// Arrays: actors: assignees, requested_reviewers
// assignees
if pr.Assignees != nil {
for _, assignee := range pr.Assignees {
if assignee == nil {
continue
}
// assignee
ghActor(tc, ctx, assignee, maybeHide)
ExecSQLTxWithErr(
tc,
ctx,
InsertIgnore(
"into gha_pull_requests_assignees(pull_request_id, event_id, assignee_id) "+NValues(3),
),
AnyArray{prid, eventID, assignee.ID}...,
)
}
}
// requested_reviewers
if pr.RequestedReviewers != nil {
for _, reviewer := range pr.RequestedReviewers {
if reviewer == nil {
continue
}
// reviewer
ghActor(tc, ctx, reviewer, maybeHide)
// pull_request-requested_reviewer connection
ExecSQLTxWithErr(
tc,
ctx,
InsertIgnore(
"into gha_pull_requests_requested_reviewers(pull_request_id, event_id, requested_reviewer_id) "+NValues(3),
),
AnyArray{prid, eventID, reviewer.ID}...,
)
}
}
// Final commit
FatalOnError(tc.Commit())
//FatalOnError(tc.Rollback())
return
}
// DeleteArtificialEvent - create artificial API event (but from the past)
func DeleteArtificialEvent(c *sql.DB, ctx *Ctx, cfg *IssueConfig) (err error) {
if ctx.SkipPDB {
if ctx.Debug > 0 {
Printf("No DB write: Delete Issue '%v'\n", *cfg)
}
return nil
}
eid := 281474976710656 + cfg.EventID
condition := fmt.Sprintf(" where event_id = %d", eid)
deletes := []string{
"delete from gha_issues_labels" + condition,
"delete from gha_issues_assignees" + condition,
"delete from gha_issues" + condition,
"delete from gha_milestones" + condition,
"delete from gha_payloads" + condition,
"delete from gha_pull_requests" + condition,
"delete from gha_pull_requests_assignees" + condition,
"delete from gha_pull_requests_requested_reviewers" + condition,
fmt.Sprintf("delete from gha_events where id = %d", eid),
}
// Start transaction
tc, err := c.Begin()
FatalOnError(err)
for _, del := range deletes {
ExecSQLTxWithErr(tc, ctx, del)
}
// Final commit
FatalOnError(tc.Commit())
//FatalOnError(tc.Rollback())
return
}
// ArtificialEvent - create artificial API event (but from the past)
func ArtificialEvent(c *sql.DB, ctx *Ctx, cfg *IssueConfig) (err error) {
// github.com/google/go-github/github/issues_events.go
if ctx.SkipPDB {
if ctx.Debug > 0 {
Printf("No DB write: Issue '%v'\n", *cfg)
}
return nil
}
// Create artificial event, add 2^48 to eid
eid := cfg.EventID
iid := cfg.IssueID
issue := cfg.GhIssue
event := cfg.GhEvent
eventID := 281474976710656 + eid
now := cfg.CreatedAt
// To handle GDPR
maybeHide := MaybeHideFunc(GetHidden(ctx, HideCfgFile))
// Start transaction
tc, err := c.Begin()
FatalOnError(err)
// Actors
ghActor(tc, ctx, issue.Assignee, maybeHide)
ghActor(tc, ctx, issue.User, maybeHide)
for _, assignee := range issue.Assignees {
ghActor(tc, ctx, assignee, maybeHide)
}
if issue.Milestone != nil {
ghActor(tc, ctx, issue.Milestone.Creator, maybeHide)
}
// Create new issue state
ExecSQLTxWithErr(
tc,
ctx,
InsertIgnore(
fmt.Sprintf(
"into gha_issues("+
"id, event_id, assignee_id, body, closed_at, comments, created_at, "+
"locked, milestone_id, number, state, title, updated_at, user_id, "+
"dup_actor_id, dup_actor_login, dup_repo_id, dup_repo_name, dup_type, dup_created_at, "+
"dup_user_login, dupn_assignee_login, is_pull_request) "+
"values(%s, %s, %s, %s, %s, %s, %s, "+
"%s, %s, %s, %s, %s, %s, %s, "+
"%s, %s, (select coalesce(max(repo_id), -1) from gha_events where dup_repo_name = %s), %s, %s, %s, "+
"%s, %s, %s) ",
NValue(1),
NValue(2),
NValue(3),
NValue(4),
NValue(5),
NValue(6),
NValue(7),
NValue(8),
NValue(9),
NValue(10),
NValue(11),
NValue(12),
NValue(13),
NValue(14),
NValue(15),
NValue(16),
NValue(17),
NValue(18),
NValue(19),
NValue(20),
NValue(21),
NValue(22),
NValue(23),
),
),
AnyArray{
iid,
eventID,
ghActorIDOrNil(issue.Assignee),
TruncStringOrNil(issue.Body, 0xffff),
TimeOrNil(issue.ClosedAt),
IntOrNil(issue.Comments),
issue.CreatedAt,
BoolOrNil(issue.Locked),
ghMilestoneIDOrNil(issue.Milestone),
issue.Number,
issue.State,
issue.Title,
now,
ghActorIDOrNil(issue.User),
ghActorIDOrNil(event.Actor),
ghActorLoginOrNil(event.Actor, maybeHide),
cfg.Repo,
cfg.Repo,
cfg.EventType,
now,
ghActorLoginOrNil(issue.User, maybeHide),
ghActorLoginOrNil(issue.Assignee, maybeHide),
issue.IsPullRequest(),
}...,
)
// Create Milestone if new event and milestone non-null
if issue.Milestone != nil {
ghMilestone(tc, ctx, eventID, cfg, maybeHide)
}
// Create artificial event
ExecSQLTxWithErr(
tc,
ctx,
InsertIgnore(
fmt.Sprintf(
"into gha_events("+
"id, type, actor_id, repo_id, public, created_at, "+
"dup_actor_login, dup_repo_name, org_id, forkee_id) "+
"values(%s, %s, %s, (select coalesce(max(repo_id), -1) from gha_events where dup_repo_name = %s), true, %s, "+
"%s, %s, (select max(org_id) from gha_events where dup_repo_name = %s), null)",
NValue(1),
NValue(2),
NValue(3),
NValue(4),
NValue(5),
NValue(6),
NValue(7),
NValue(8),
),
),
AnyArray{
eventID,
cfg.EventType,
ghActorIDOrNil(event.Actor),
cfg.Repo,
now,
ghActorLoginOrNil(event.Actor, maybeHide),
cfg.Repo,
cfg.Repo,
}...,
)
// Create artificial event's payload
ExecSQLTxWithErr(
tc,
ctx,
InsertIgnore(
fmt.Sprintf(
"into gha_payloads("+
"event_id, push_id, size, ref, head, befor, action, "+
"issue_id, pull_request_id, comment_id, ref_type, master_branch, commit, "+
"description, number, forkee_id, release_id, member_id, "+
"dup_actor_id, dup_actor_login, dup_repo_id, dup_repo_name, dup_type, dup_created_at) "+
"values(%s, null, null, null, null, null, %s, "+
"%s, null, null, null, null, null, "+
"null, %s, null, null, null, "+
"%s, %s, (select coalesce(max(repo_id), -1) from gha_events where dup_repo_name = %s), %s, %s, %s)",
NValue(1),
NValue(2),
NValue(3),
NValue(4),
NValue(5),
NValue(6),
NValue(7),
NValue(8),
NValue(9),
NValue(10),
),
),
AnyArray{
eventID,
cfg.EventType,
iid,
issue.Number,
ghActorIDOrNil(event.Actor),
ghActorLoginOrNil(event.Actor, maybeHide),
cfg.Repo,
cfg.Repo,
cfg.EventType,
now,
}...,
)
// Add issue labels
for labelID, labelName := range cfg.LabelsMap {
ExecSQLTxWithErr(
tc,
ctx,
InsertIgnore(
fmt.Sprintf(
"into gha_issues_labels(issue_id, event_id, label_id, "+
"dup_actor_id, dup_actor_login, dup_repo_id, dup_repo_name, "+
"dup_type, dup_created_at, dup_issue_number, dup_label_name) "+
"values(%s, %s, %s, "+
"%s, %s, (select coalesce(max(repo_id), -1) from gha_events where dup_repo_name = %s), %s, "+
"%s, %s, %s, %s)",
NValue(1),
NValue(2),
NValue(3),
NValue(4),
NValue(5),
NValue(6),
NValue(7),
NValue(8),
NValue(9),
NValue(10),
NValue(11),
),
),
AnyArray{
iid,
eventID,
labelID,
ghActorIDOrNil(event.Actor),
ghActorLoginOrNil(event.Actor, maybeHide),
cfg.Repo,
cfg.Repo,
cfg.EventType,
now,
issue.Number,
labelName,
}...,
)
}
// Add issue assignees
for assigneeID := range cfg.AssigneesMap {
ExecSQLTxWithErr(
tc,
ctx,
InsertIgnore(
fmt.Sprintf(
"into gha_issues_assignees(issue_id, event_id, assignee_id) "+
"values(%s, %s, %s)",
NValue(1),
NValue(2),
NValue(3),
),
),
AnyArray{
iid,
eventID,
assigneeID,
}...,
)
}
// Final commit
FatalOnError(tc.Commit())
//FatalOnError(tc.Rollback())
return
}
// SyncIssuesState synchonizes issues states
// manual:
//
// false: normal devstats sync cron mode using 'ghapi2db' tool
// true: manual sync using 'sync_issues' tool
func SyncIssuesState(gctx context.Context, gc []*github.Client, ctx *Ctx, c *sql.DB, issues map[int64]IssueConfigAry, prs map[int64]github.PullRequest, manual bool) {
nIssuesBefore := 0
for _, issueConfig := range issues {
nIssuesBefore += len(issueConfig)
}
// Sort issues to by their state changes in time
for issueID := range issues {
sort.Sort(issues[issueID])
if ctx.Debug > 1 {
Printf("Sorted: %+v\n", issues[issueID])
}
}
// Output data info
outputIssuesInfo(issues, "Issues to process")
// Get number of CPUs available
thrN := GetThreadsNum(ctx)
prc := 0
var issuesMutex = &sync.RWMutex{}
// Now iterate all issues/PR in MT mode
ch := make(chan bool)
nThreads := 0
dtStart := time.Now()
lastTime := dtStart
nIssues := 0
for _, issueConfig := range issues {
nIssues += len(issueConfig)
}
nPRs := len(prs)
checked := 0
var updatesMutex = &sync.Mutex{}
updates := []int{0, 0, 0}
// updates (non-manual mode):
// 0: no such event --> new
// 1: event exists and the same state --> no new
// 2: event exists with different state --> warning
// updates (manual mode)
// 0 - no such issue --> new
// 1: previous issue state exists, no new
// 2: previous issue state exists, new needed
infos := make(map[string][]string)
Printf("ghapi2db.go: Processing %d PRs, %d issues (%d with date collisions), manual mode: %v - GHA part\n", nPRs, nIssues, nIssuesBefore, manual)
// Use map key to pass to the closure
for key, issueConfig := range issues {
for idx := range issueConfig {
go func(ch chan bool, iid int64, idx int) {
why := ""
what := ""
// Refer to current tag using index passed to anonymous function
issuesMutex.RLock()
cfg := issues[iid][idx]
issuesMutex.RUnlock()
if ctx.Debug > 1 {
Printf("GHA Issue ID '%d' --> '%v'\n", iid, cfg)
}
var (
ghaMilestoneID *int64
ghaEventID int64
ghaClosedAt *time.Time
ghaState string
ghaTitle string
ghaLocked bool
ghaAssigneeID *int64
)
// Process current milestone (given issue and second)
apiMilestoneID := cfg.MilestoneID
apiClosedAt := cfg.GhIssue.ClosedAt
apiState := *cfg.GhIssue.State
apiTitle := *cfg.GhIssue.Title
apiLocked := *cfg.GhIssue.Locked
apiAssigneeID := cfg.AssigneeID
eventID := 281474976710656 + cfg.EventID
// Get eventual current state
var rowsM *sql.Rows
if manual {
rowsM = QuerySQLWithErr(
c,
ctx,
fmt.Sprintf(
"select milestone_id, event_id, closed_at, state, title, locked, assignee_id "+
"from gha_issues where id = %s "+
"order by updated_at desc, event_id desc limit 1",
NValue(1),
),
cfg.IssueID,
)
} else {
rowsM = QuerySQLWithErr(
c,
ctx,
fmt.Sprintf(
"select milestone_id, event_id, closed_at, state, title, locked, assignee_id "+
"from gha_issues where id = %s and event_id = %s",
NValue(1),
NValue(2),
),
cfg.IssueID,
eventID,
)
}
defer func() { FatalOnError(rowsM.Close()) }()
got := false
for rowsM.Next() {
FatalOnError(
rowsM.Scan(
&ghaMilestoneID,
&ghaEventID,
&ghaClosedAt,
&ghaState,
&ghaTitle,
&ghaLocked,
&ghaAssigneeID,
),
)
got = true
}
FatalOnError(rowsM.Err())
// Missing event
if !got {
if ctx.Debug > 1 {
Printf("Adding missing (%v) event '%v'\n", cfg.CreatedAt, cfg)
}
FatalOnError(
ArtificialEvent(
c,
ctx,
&cfg,
),
)
if manual {
why = "no previous issue state"
what = fmt.Sprintf("%s %d", cfg.Repo, cfg.Number)
} else {
why = "no issue event"
what = fmt.Sprintf("%s %d %s %s", cfg.Repo, cfg.Number, ToYMDHMSDate(cfg.CreatedAt), cfg.EventType)
}
updatesMutex.Lock()
updates[0]++
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
ch <- true
return
}
// Now have existing GHA event, but we don't know if it is a correct state event
// Or just bot comment after which (on the same second) milestone or label(s) are updated
// Check state change
changedState := false
if apiState != ghaState {
changedState = true
if ctx.Debug > 1 {
Printf("Updating issue '%v' state %s -> %s\n", cfg, ghaState, apiState)
}
why = "changed issue state"
if manual {
what = fmt.Sprintf("%s %d: %s -> %s", cfg.Repo, cfg.Number, ghaState, apiState)
} else {
what = fmt.Sprintf("%s %d %s %s: %s -> %s", cfg.Repo, cfg.Number, ToYMDHMSDate(cfg.CreatedAt), cfg.EventType, ghaState, apiState)
}
updatesMutex.Lock()
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
}
// Check title change
changedTitle := false
if apiTitle != ghaTitle {
changedTitle = true
if ctx.Debug > 1 {
Printf("Updating issue '%v' title %s -> %s\n", cfg, ghaTitle, apiTitle)
}
why = "changed issue title"
if manual {
what = fmt.Sprintf("%s %d: %s -> %s", cfg.Repo, cfg.Number, ghaTitle, apiTitle)
} else {
what = fmt.Sprintf("%s %d %s %s: %s -> %s", cfg.Repo, cfg.Number, ToYMDHMSDate(cfg.CreatedAt), cfg.EventType, ghaTitle, apiTitle)
}
updatesMutex.Lock()
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
}
// Check locked change
changedLocked := false
if apiLocked != ghaLocked {
changedLocked = true
if ctx.Debug > 1 {
Printf("Updating issue '%v' locked %v -> %v\n", cfg, ghaLocked, apiLocked)
}
why = "changed issue locked state"
if manual {
what = fmt.Sprintf("%s %d: %v -> %v", cfg.Repo, cfg.Number, ghaLocked, apiLocked)
} else {
what = fmt.Sprintf("%s %d %s %s: %v -> %v", cfg.Repo, cfg.Number, ToYMDHMSDate(cfg.CreatedAt), cfg.EventType, ghaLocked, apiLocked)
}
updatesMutex.Lock()
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
}
// Check closed_at change
changedClosed := false
if (apiClosedAt == nil && ghaClosedAt != nil) || (apiClosedAt != nil && ghaClosedAt == nil) || (apiClosedAt != nil && ghaClosedAt != nil && ToYMDHMSDate(*apiClosedAt) != ToYMDHMSDate(*ghaClosedAt)) {
changedClosed = true
from := Null
if ghaClosedAt != nil {
from = fmt.Sprintf("%v", ToYMDHMSDate(*ghaClosedAt))
}
to := Null
if apiClosedAt != nil {
to = fmt.Sprintf("%v", ToYMDHMSDate(*apiClosedAt))
}
if ctx.Debug > 1 {
Printf("Updating issue '%v' closed_at %s -> %s\n", cfg, from, to)
}
why = "changed issue closed at"
if manual {
what = fmt.Sprintf("%s %d: %s -> %s", cfg.Repo, cfg.Number, from, to)
} else {
what = fmt.Sprintf("%s %d %s %s: %s -> %s", cfg.Repo, cfg.Number, ToYMDHMSDate(cfg.CreatedAt), cfg.EventType, from, to)
}
updatesMutex.Lock()
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
}
// Check milestone change
changedMilestone := false
if (apiMilestoneID == nil && ghaMilestoneID != nil) || (apiMilestoneID != nil && ghaMilestoneID == nil) || (apiMilestoneID != nil && ghaMilestoneID != nil && *apiMilestoneID != *ghaMilestoneID) {
changedMilestone = true
from := Null
if ghaMilestoneID != nil {
from = fmt.Sprintf("%d", *ghaMilestoneID)
}
to := Null
if apiMilestoneID != nil {
to = fmt.Sprintf("%d", *apiMilestoneID)
}
if ctx.Debug > 1 {
Printf("Updating issue '%v' milestone %s -> %s\n", cfg, from, to)
}
why = "changed issue milestone"
if manual {
what = fmt.Sprintf("%s %d: %s -> %s", cfg.Repo, cfg.Number, from, to)
} else {
what = fmt.Sprintf("%s %d %s %s: %s -> %s", cfg.Repo, cfg.Number, ToYMDHMSDate(cfg.CreatedAt), cfg.EventType, from, to)
}
updatesMutex.Lock()
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
}
// Check assignee change
changedAssignee := false
if (apiAssigneeID == nil && ghaAssigneeID != nil) || (apiAssigneeID != nil && ghaAssigneeID == nil) || (apiAssigneeID != nil && ghaAssigneeID != nil && *apiAssigneeID != *ghaAssigneeID) {
changedAssignee = true
from := Null
if ghaAssigneeID != nil {
from = fmt.Sprintf("%d", *ghaAssigneeID)
}
to := Null
if apiAssigneeID != nil {
to = fmt.Sprintf("%d", *apiAssigneeID)
}
if ctx.Debug > 1 {
Printf("Updating issue '%v' assignee %s -> %s\n", cfg, from, to)
}
why = "changed issue assignee"
if manual {
what = fmt.Sprintf("%s %d: %s -> %s", cfg.Repo, cfg.Number, from, to)
} else {
what = fmt.Sprintf("%s %d %s %s: %s -> %s", cfg.Repo, cfg.Number, ToYMDHMSDate(cfg.CreatedAt), cfg.EventType, from, to)
}
updatesMutex.Lock()
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
}
// Process current labels
rowsL := QuerySQLWithErr(
c,
ctx,
fmt.Sprintf(
"select coalesce(string_agg(sub.label_id::text, ','), '') from "+
"(select label_id from gha_issues_labels where event_id = %s "+
"order by label_id) sub",
NValue(1),
),
ghaEventID,
)
defer func() { FatalOnError(rowsL.Close()) }()
ghaLabels := ""
for rowsL.Next() {
FatalOnError(rowsL.Scan(&ghaLabels))
}
FatalOnError(rowsL.Err())
changedLabels := false
if ghaLabels != cfg.Labels {
if ctx.Debug > 1 {
Printf("Updating issue '%v' labels to '%s', they were: '%s' (event_id %d)\n", cfg, cfg.Labels, ghaLabels, ghaEventID)
}
changedLabels = true
why = "changed issue labels"
if manual {
what = fmt.Sprintf("%s %d: %s -> %s", cfg.Repo, cfg.Number, ghaLabels, cfg.Labels)
} else {
what = fmt.Sprintf("%s %d %s %s: %s -> %s", cfg.Repo, cfg.Number, ToYMDHMSDate(cfg.CreatedAt), cfg.EventType, ghaLabels, cfg.Labels)
}
updatesMutex.Lock()
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
}
// Process current assignees
rowsA := QuerySQLWithErr(
c,
ctx,
fmt.Sprintf(
"select coalesce(string_agg(sub.assignee_id::text, ','), '') from "+
"(select assignee_id from gha_issues_assignees where event_id = %s "+
"order by assignee_id) sub",
NValue(1),
),
ghaEventID,
)
defer func() { FatalOnError(rowsA.Close()) }()
ghaAssignees := ""
for rowsA.Next() {
FatalOnError(rowsA.Scan(&ghaAssignees))
}
FatalOnError(rowsA.Err())
changedAssignees := false
if ghaAssignees != cfg.Assignees {
if ctx.Debug > 1 {
Printf("Updating issue '%v' assignees to '%s', they were: '%s' (event_id %d)\n", cfg, cfg.Assignees, ghaAssignees, ghaEventID)
}
changedAssignees = true
why = "changed issue assignees"
if manual {
what = fmt.Sprintf("%s %d: %s -> %s", cfg.Repo, cfg.Number, ghaAssignees, cfg.Assignees)
} else {
what = fmt.Sprintf("%s %d %s %s: %s -> %s", cfg.Repo, cfg.Number, ToYMDHMSDate(cfg.CreatedAt), cfg.EventType, ghaAssignees, cfg.Assignees)
}
updatesMutex.Lock()
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
}
uidx := 1
why = "previous issue state the same"
if manual {
what = fmt.Sprintf("%s %d", cfg.Repo, cfg.Number)
} else {
what = fmt.Sprintf("%s %d %s %s", cfg.Repo, cfg.Number, ToYMDHMSDate(cfg.CreatedAt), cfg.EventType)
}
// Do the update if needed
changedAnything := changedMilestone || changedState || changedClosed || changedAssignee || changedTitle || changedLocked || changedLabels || changedAssignees
if changedAnything {
uidx = 2
if manual {
FatalOnError(
ArtificialEvent(
c,
ctx,
&cfg,
),
)
why = "previous issue state different"
what = fmt.Sprintf("%s %d", cfg.Repo, cfg.Number)
} else {
if ctx.Debug > 0 {
Printf("Debug: Exact artificial event (%v, %d) already exists with different state, skipping: '%v'\n", cfg.CreatedAt, eventID, cfg)
}
why = "collision and issue state differs"
what = fmt.Sprintf("%s %d %s %s: %d", cfg.Repo, cfg.Number, ToYMDHMSDate(cfg.CreatedAt), cfg.EventType, eventID)
if !ctx.SkipUpdateEvents {
why = "updated existing issue state"
FatalOnError(DeleteArtificialEvent(c, ctx, &cfg))
FatalOnError(ArtificialEvent(c, ctx, &cfg))
}
}
}
if ctx.Debug > 1 {
if manual {
Printf("Previous event (event_id: %d), added artificial: %v: '%v'\n", ghaEventID, changedAnything, cfg)
} else {
Printf("Event for the same date (%v) exist (event_id: %d), added artificial: %v: '%v'\n", cfg.CreatedAt, ghaEventID, changedAnything, cfg)
}
}
updatesMutex.Lock()
updates[uidx]++
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
// Synchronize go routine
ch <- changedAnything
}(ch, key, idx)
// go routine called with 'ch' channel to sync and tag index
nThreads++
for nThreads >= thrN {
<-ch
nThreads--
prc++
if prc%20 == 0 {
thrN = GetThreadsNum(ctx)
}
checked++
ProgressInfo(checked, nIssues, dtStart, &lastTime, time.Duration(10)*time.Second, "")
}
}
}
// Usually all work happens on '<-ch'
for nThreads > 0 {
<-ch
nThreads--
checked++
ProgressInfo(checked, nIssues, dtStart, &lastTime, time.Duration(10)*time.Second, "")
}
// Get RateLimits info
hint, _, rem, wait := GetRateLimits(gctx, ctx, gc, true)
if manual {
Printf(
"ghapi2db.go: Manually processed %d issues/PRs (%d new issues, existing: %d not needed, %d added): %+v API points remain, resets in %+v, hint key: %d\n",
checked, updates[0], updates[1], updates[2], rem, wait, hint,
)
} else {
Printf(
"ghapi2db.go: Automatically processed %d issues/PRs (%d new, %d the same exists, %d incorrect state exists): %+v API points remain, resets in %+v, hint key: %d\n",
checked, updates[0], updates[1], updates[2], rem, wait, hint,
)
}
// Info
outputInfo(infos, "Issues")
// PRs sync (using state at run date XX:08+)
// Use map key to pass to the closure
outputPRsInfo(prs, "PRs to process")
infos = make(map[string][]string)
ch = make(chan bool)
nThreads = 0
dtStart = time.Now()
lastTime = dtStart
checked = 0
updates = []int{0, 0, 0}
var prsMutex = &sync.RWMutex{}
for iid := range prs {
go func(ch chan bool, iid int64) {
why := ""
what := ""
prsMutex.RLock()
pr := prs[iid]
ica := issues[iid]
l := len(ica)
ic := ica[l-1]
prsMutex.RUnlock()
prid := *pr.ID
updatedAt := *pr.UpdatedAt
if ctx.Debug > 1 {
Printf("GHA Issue ID '%d' --> PR ID %d, updated %v\n", iid, prid, updatedAt)
}
var (
ghaMilestoneID *int64
ghaEventID int64
ghaClosedAt *time.Time
ghaState string
ghaTitle string
ghaMergedByID *int64
ghaMergedAt *time.Time
ghaMerged *bool
ghaAssigneeID *int64
apiMilestoneID *int64
apiAssigneeID *int64
apiMergedByID *int64
)
// Process current milestone
if pr.Milestone != nil {
apiMilestoneID = pr.Milestone.ID
}
apiClosedAt := pr.ClosedAt
apiState := *pr.State
apiTitle := *pr.Title
if pr.Assignee != nil {
apiAssigneeID = pr.Assignee.ID
}
if pr.MergedBy != nil {
apiMergedByID = pr.MergedBy.ID
}
apiMergedAt := pr.MergedAt
apiMerged := pr.Merged
eventID := 281474976710656 + ic.EventID
// Get event for this date
var rowsM *sql.Rows
if manual {
rowsM = QuerySQLWithErr(
c,
ctx,
fmt.Sprintf(
"select milestone_id, event_id, closed_at, state, title, assignee_id, "+
"merged_by_id, merged_at, merged "+
"from gha_pull_requests where id = %s "+
"order by updated_at desc, event_id desc limit 1",
NValue(1),
),
prid,
)
} else {
rowsM = QuerySQLWithErr(
c,
ctx,
fmt.Sprintf(
"select milestone_id, event_id, closed_at, state, title, assignee_id, "+
"merged_by_id, merged_at, merged "+
"from gha_pull_requests where id = %s and event_id = %s",
NValue(1),
NValue(2),
),
prid,
eventID,
)
}
defer func() { FatalOnError(rowsM.Close()) }()
got := false
for rowsM.Next() {
FatalOnError(
rowsM.Scan(
&ghaMilestoneID,
&ghaEventID,
&ghaClosedAt,
&ghaState,
&ghaTitle,
&ghaAssigneeID,
&ghaMergedByID,
&ghaMergedAt,
&ghaMerged,
),
)
got = true
}
FatalOnError(rowsM.Err())
if !got {
if ctx.Debug > 1 {
Printf("Adding missing (%v) PR event '%v', PR ID: %d\n", updatedAt, ic, prid)
}
FatalOnError(
ArtificialPREvent(
c,
ctx,
&ic,
&pr,
),
)
if manual {
why = "no previous pr state"
what = fmt.Sprintf("%s %d", ic.Repo, ic.Number)
} else {
why = "no pr event"
what = fmt.Sprintf("%s %d %s %s", ic.Repo, ic.Number, ToYMDHMSDate(ic.CreatedAt), ic.EventType)
}
updatesMutex.Lock()
updates[0]++
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
ch <- true
return
}
// Check state change
changedState := false
if apiState != ghaState {
changedState = true
if ctx.Debug > 1 {
Printf("Updating PR '%v' state %s -> %s\n", ic, ghaState, apiState)
}
why = "changed pr state"
if manual {
what = fmt.Sprintf("%s %d: %s -> %s", ic.Repo, ic.Number, ghaState, apiState)
} else {
what = fmt.Sprintf("%s %d %s %s: %s -> %s", ic.Repo, ic.Number, ToYMDHMSDate(ic.CreatedAt), ic.EventType, ghaState, apiState)
}
updatesMutex.Lock()
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
}
// Check title change
changedTitle := false
if apiTitle != ghaTitle {
changedTitle = true
if ctx.Debug > 1 {
Printf("Updating PR '%v' title %s -> %s\n", ic, ghaTitle, apiTitle)
}
why = "changed pr title"
if manual {
what = fmt.Sprintf("%s %d: %s -> %s", ic.Repo, ic.Number, ghaTitle, apiTitle)
} else {
what = fmt.Sprintf("%s %d %s %s: %s -> %s", ic.Repo, ic.Number, ToYMDHMSDate(ic.CreatedAt), ic.EventType, ghaTitle, apiTitle)
}
updatesMutex.Lock()
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
}
// Check merged change
changedMerged := false
if (apiMerged == nil && ghaMerged != nil) || (apiMerged != nil && ghaMerged == nil) || (apiMerged != nil && ghaMerged != nil && *apiMerged != *ghaMerged) {
changedMerged = true
from := Null
if ghaMerged != nil {
from = fmt.Sprintf("%v", *ghaMerged)
}
to := Null
if apiMerged != nil {
to = fmt.Sprintf("%v", *apiMerged)
}
if ctx.Debug > 1 {
Printf("Updating PR '%v' merged %s -> %s\n", ic, from, to)
}
why = "changed pr merged"
if manual {
what = fmt.Sprintf("%s %d: %s -> %s", ic.Repo, ic.Number, from, to)
} else {
what = fmt.Sprintf("%s %d %s %s: %s -> %s", ic.Repo, ic.Number, ToYMDHMSDate(ic.CreatedAt), ic.EventType, from, to)
}
updatesMutex.Lock()
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
}
// Check closed_at change
changedClosed := false
if (apiClosedAt == nil && ghaClosedAt != nil) || (apiClosedAt != nil && ghaClosedAt == nil) || (apiClosedAt != nil && ghaClosedAt != nil && ToYMDHMSDate(*apiClosedAt) != ToYMDHMSDate(*ghaClosedAt)) {
changedClosed = true
from := Null
if ghaClosedAt != nil {
from = fmt.Sprintf("%v", ToYMDHMSDate(*ghaClosedAt))
}
to := Null
if apiClosedAt != nil {
to = fmt.Sprintf("%v", ToYMDHMSDate(*apiClosedAt))
}
if ctx.Debug > 1 {
Printf("Updating PR '%v' closed_at %s -> %s\n", ic, from, to)
}
why = "changed pr closed at"
if manual {
what = fmt.Sprintf("%s %d: %s -> %s", ic.Repo, ic.Number, from, to)
} else {
what = fmt.Sprintf("%s %d %s %s: %s -> %s", ic.Repo, ic.Number, ToYMDHMSDate(ic.CreatedAt), ic.EventType, from, to)
}
updatesMutex.Lock()
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
}
// Check merged_at change
changedMergedAt := false
if (apiMergedAt == nil && ghaMergedAt != nil) || (apiMergedAt != nil && ghaMergedAt == nil) || (apiMergedAt != nil && ghaMergedAt != nil && ToYMDHMSDate(*apiMergedAt) != ToYMDHMSDate(*ghaMergedAt)) {
changedMergedAt = true
from := Null
if ghaMergedAt != nil {
from = fmt.Sprintf("%v", ToYMDHMSDate(*ghaMergedAt))
}
to := Null
if apiMergedAt != nil {
to = fmt.Sprintf("%v", ToYMDHMSDate(*apiMergedAt))
}
if ctx.Debug > 1 {
Printf("Updating PR '%v' merged_at %s -> %s\n", ic, from, to)
}
why = "changed pr merged at"
if manual {
what = fmt.Sprintf("%s %d: %s -> %s", ic.Repo, ic.Number, from, to)
} else {
what = fmt.Sprintf("%s %d %s %s: %s -> %s", ic.Repo, ic.Number, ToYMDHMSDate(ic.CreatedAt), ic.EventType, from, to)
}
updatesMutex.Lock()
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
}
// Check milestone change
changedMilestone := false
if (apiMilestoneID == nil && ghaMilestoneID != nil) || (apiMilestoneID != nil && ghaMilestoneID == nil) || (apiMilestoneID != nil && ghaMilestoneID != nil && *apiMilestoneID != *ghaMilestoneID) {
changedMilestone = true
from := Null
if ghaMilestoneID != nil {
from = fmt.Sprintf("%d", *ghaMilestoneID)
}
to := Null
if apiMilestoneID != nil {
to = fmt.Sprintf("%d", *apiMilestoneID)
}
if ctx.Debug > 1 {
Printf("Updating PR '%v' milestone %s -> %s\n", ic, from, to)
}
why = "changed pr milestone"
if manual {
what = fmt.Sprintf("%s %d: %s -> %s", ic.Repo, ic.Number, from, to)
} else {
what = fmt.Sprintf("%s %d %s %s: %s -> %s", ic.Repo, ic.Number, ToYMDHMSDate(ic.CreatedAt), ic.EventType, from, to)
}
updatesMutex.Lock()
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
}
// Check assignee change
changedAssignee := false
if (apiAssigneeID == nil && ghaAssigneeID != nil) || (apiAssigneeID != nil && ghaAssigneeID == nil) || (apiAssigneeID != nil && ghaAssigneeID != nil && *apiAssigneeID != *ghaAssigneeID) {
changedAssignee = true
from := Null
if ghaAssigneeID != nil {
from = fmt.Sprintf("%d", *ghaAssigneeID)
}
to := Null
if apiAssigneeID != nil {
to = fmt.Sprintf("%d", *apiAssigneeID)
}
if ctx.Debug > 1 {
Printf("Updating PR '%v' assignee %s -> %s\n", ic, from, to)
}
why = "changed pr assignee"
if manual {
what = fmt.Sprintf("%s %d: %s -> %s", ic.Repo, ic.Number, from, to)
} else {
what = fmt.Sprintf("%s %d %s %s: %s -> %s", ic.Repo, ic.Number, ToYMDHMSDate(ic.CreatedAt), ic.EventType, from, to)
}
updatesMutex.Lock()
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
}
// Check merged by change
changedMergedBy := false
if (apiMergedByID == nil && ghaMergedByID != nil) || (apiMergedByID != nil && ghaMergedByID == nil) || (apiMergedByID != nil && ghaMergedByID != nil && *apiMergedByID != *ghaMergedByID) {
changedMergedBy = true
from := Null
if ghaMergedByID != nil {
from = fmt.Sprintf("%d", *ghaMergedByID)
}
to := Null
if apiMergedByID != nil {
to = fmt.Sprintf("%d", *apiMergedByID)
}
if ctx.Debug > 1 {
Printf("Updating PR '%v' merged by %s -> %s\n", ic, from, to)
}
why = "changed pr merged by"
if manual {
what = fmt.Sprintf("%s %d: %s -> %s", ic.Repo, ic.Number, from, to)
} else {
what = fmt.Sprintf("%s %d %s %s: %s -> %s", ic.Repo, ic.Number, ToYMDHMSDate(ic.CreatedAt), ic.EventType, from, to)
}
updatesMutex.Lock()
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
}
// API Assignees
AssigneesMap := make(map[int64]string)
for _, assignee := range pr.Assignees {
AssigneesMap[*assignee.ID] = *assignee.Login
}
assigneesAry := Int64Ary{}
for assignee := range AssigneesMap {
assigneesAry = append(assigneesAry, assignee)
}
sort.Sort(assigneesAry)
l = len(assigneesAry)
apiAssignees := ""
for i, assignee := range assigneesAry {
if i == l-1 {
apiAssignees += fmt.Sprintf("%d", assignee)
} else {
apiAssignees += fmt.Sprintf("%d,", assignee)
}
}
// GHA assignees
rowsA := QuerySQLWithErr(
c,
ctx,
fmt.Sprintf(
"select coalesce(string_agg(sub.assignee_id::text, ','), '') from "+
"(select assignee_id from gha_pull_requests_assignees where event_id = %s "+
"order by assignee_id) sub",
NValue(1),
),
ghaEventID,
)
defer func() { FatalOnError(rowsA.Close()) }()
ghaAssignees := ""
for rowsA.Next() {
FatalOnError(rowsA.Scan(&ghaAssignees))
}
FatalOnError(rowsA.Err())
changedAssignees := false
if ghaAssignees != apiAssignees {
if ctx.Debug > 1 {
Printf("Updating PR '%v' assignees to '%s', they were: '%s' (event_id %d)\n", ic, apiAssignees, ghaAssignees, ghaEventID)
}
changedAssignees = true
why = "changed pr assignees"
if manual {
what = fmt.Sprintf("%s %d: %s -> %s", ic.Repo, ic.Number, ghaAssignees, apiAssignees)
} else {
what = fmt.Sprintf("%s %d %s %s: %s -> %s", ic.Repo, ic.Number, ToYMDHMSDate(ic.CreatedAt), ic.EventType, ghaAssignees, apiAssignees)
}
updatesMutex.Lock()
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
}
// API Requested reviewers
RequestedReviewersMap := make(map[int64]string)
for _, reviewer := range pr.RequestedReviewers {
RequestedReviewersMap[*reviewer.ID] = *reviewer.Login
}
reviewersAry := Int64Ary{}
for reviewer := range RequestedReviewersMap {
reviewersAry = append(reviewersAry, reviewer)
}
sort.Sort(reviewersAry)
l = len(reviewersAry)
apiRequestedReviewers := ""
for i, reviewer := range reviewersAry {
if i == l-1 {
apiRequestedReviewers += fmt.Sprintf("%d", reviewer)
} else {
apiRequestedReviewers += fmt.Sprintf("%d,", reviewer)
}
}
// GHA reviewers
rowsRV := QuerySQLWithErr(
c,
ctx,
fmt.Sprintf(
"select coalesce(string_agg(sub.requested_reviewer_id::text, ','), '') from "+
"(select requested_reviewer_id from gha_pull_requests_requested_reviewers where event_id = %s "+
"order by requested_reviewer_id) sub",
NValue(1),
),
ghaEventID,
)
defer func() { FatalOnError(rowsRV.Close()) }()
ghaRequestedReviewers := ""
for rowsRV.Next() {
FatalOnError(rowsRV.Scan(&ghaRequestedReviewers))
}
FatalOnError(rowsRV.Err())
changedRequestedReviewers := false
if ghaRequestedReviewers != apiRequestedReviewers {
if ctx.Debug > 1 {
Printf("Updating PR '%v' requested reviewers to '%s', they were: '%s' (event_id %d)\n", ic, apiRequestedReviewers, ghaRequestedReviewers, ghaEventID)
}
changedRequestedReviewers = true
why = "changed pr reqested reviewers"
if manual {
what = fmt.Sprintf("%s %d: %s -> %s", ic.Repo, ic.Number, ghaRequestedReviewers, apiRequestedReviewers)
} else {
what = fmt.Sprintf("%s %d %s %s: %s -> %s", ic.Repo, ic.Number, ToYMDHMSDate(ic.CreatedAt), ic.EventType, ghaRequestedReviewers, apiRequestedReviewers)
}
updatesMutex.Lock()
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
}
uidx := 1
why = "previous pr state the same"
if manual {
what = fmt.Sprintf("%s %d", ic.Repo, ic.Number)
} else {
what = fmt.Sprintf("%s %d %s %s", ic.Repo, ic.Number, ToYMDHMSDate(ic.CreatedAt), ic.EventType)
}
// Do the update if needed
changedAnything := changedMilestone || changedState || changedClosed || changedMerged || changedMergedAt || changedMergedBy || changedAssignee || changedTitle || changedAssignees || changedRequestedReviewers
if changedAnything {
uidx = 2
if manual {
FatalOnError(
ArtificialPREvent(
c,
ctx,
&ic,
&pr,
),
)
why = "previous pr state different"
what = fmt.Sprintf("%s %d", ic.Repo, ic.Number)
} else {
if ctx.Debug > 0 {
Printf("Warning: Exact artificial PR event (%v, %d) already exists with different state, skipping: '%v'\n", ic.CreatedAt, eventID, ic)
}
why = "collision and pr state differs"
what = fmt.Sprintf("%s %d %s %s: %d", ic.Repo, ic.Number, ToYMDHMSDate(ic.CreatedAt), ic.EventType, eventID)
if !ctx.SkipUpdateEvents {
why = "updated existing pr state"
FatalOnError(DeleteArtificialPREvent(c, ctx, &ic))
FatalOnError(ArtificialPREvent(c, ctx, &ic, &pr))
}
}
}
if ctx.Debug > 1 {
if manual {
Printf("PR Event exist (event_id: %d), added artificial: %v: '%v'\n", ghaEventID, changedAnything, ic)
} else {
Printf("PR Event for the same date (%v) exist (event_id: %d), added artificial: %v: '%v'\n", updatedAt, ghaEventID, changedAnything, ic)
}
}
updatesMutex.Lock()
updates[uidx]++
_, ok := infos[why]
if ok {
infos[why] = append(infos[why], what)
} else {
infos[why] = []string{what}
}
updatesMutex.Unlock()
// Synchronize go routine
ch <- changedAnything
}(ch, iid)
// go routine called with 'ch' channel to sync and tag index
nThreads++
for nThreads >= thrN {
<-ch
nThreads--
prc++
if prc%20 == 0 {
thrN = GetThreadsNum(ctx)
}
checked++
ProgressInfo(checked, nIssues, dtStart, &lastTime, time.Duration(10)*time.Second, "")
}
}
// Usually all work happens on '<-ch'
for nThreads > 0 {
<-ch
nThreads--
checked++
ProgressInfo(checked, nIssues, dtStart, &lastTime, time.Duration(10)*time.Second, "")
}
// Get RateLimits info
hint, _, rem, wait = GetRateLimits(gctx, ctx, gc, true)
if manual {
Printf(
"ghapi2db.go: Manually processed %d PRs (%d new PRs, existing: %d not needed, %d added): %+v API points remain, resets in %+v, hint key: %d\n",
checked, updates[0], updates[1], updates[2], rem, wait, hint,
)
} else {
Printf(
"ghapi2db.go: Automatically processed %d PRs (%d new PRs, existing: %d not needed, %d added): %+v API points remain, resets in %+v, hint key: %d\n",
checked, updates[0], updates[1], updates[2], rem, wait, hint,
)
}
// Info
outputInfo(infos, "PRs")
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/cncf/devstatscode.git
git@gitee.com:cncf/devstatscode.git
cncf
devstatscode
devstatscode
master

搜索帮助