diff --git a/official/update.rkt b/official/update.rkt index d0847f1..8597ab7 100644 --- a/official/update.rkt +++ b/official/update.rkt @@ -3,31 +3,94 @@ racket/function racket/system racket/package + racket/port racket/match + racket/async-channel pkg/private/stage (prefix-in pkg: pkg/lib) "common.rkt" "notify.rkt" "static.rkt") +(define NUM-THREADS 4) + (define (update-all) (update-checksums #f (package-list))) (define (update-pkgs pkgs) (update-checksums #t pkgs)) +(define (t:log! . args) + (thread-send current-display-thd + (format "~a: ~a" + (current-thd-no) + (with-output-to-string (λ () (apply log! args)))))) + +(define current-thd-no (make-parameter #f)) +(define current-display-thd #f) + (define (update-checksums force? pkgs) - (filter (λ (pkg-name) - (cond - [(package-exists? pkg-name) - (update-checksum force? pkg-name)] - [else (log! "update-checksums: invariant broken; ~a doesn't exist" pkg-name) - ;; considered not update - #f])) - pkgs)) + (define updated-pkgs '()) + + (define collector-thd + (thread + (λ () + (let loop () + (match (thread-receive) + ['terminate (void)] + [pkg-name + (set! updated-pkgs (cons pkg-name updated-pkgs)) + (loop)]))))) + + (set! current-display-thd + (thread + (λ () + (let loop () + (match (thread-receive) + ['terminate (void)] + [s (display s) + (loop)]))))) + + (define ch (make-async-channel)) + + (define thds + (for/list ([i NUM-THREADS]) + (thread + (λ () + (parameterize ([current-thd-no i]) + (let loop () + (match (async-channel-get ch) + ['terminate (void)] + [pkg-name + (when (update-checksum force? pkg-name) + (thread-send collector-thd pkg-name)) + (loop)]))))))) + + (for ([pkg-name pkgs]) + (cond + [(package-exists? pkg-name) + (async-channel-put ch pkg-name)] + [else + (parameterize ([current-thd-no 'main]) + (t:log! "update-checksums: invariant broken; ~a doesn't exist" pkg-name))])) + + (for ([thd thds]) + (async-channel-put ch 'terminate)) + + (for ([thd thds]) + (thread-wait thd)) + + (thread-send current-display-thd 'terminate) + (thread-wait current-display-thd) + (set! current-display-thd #f) + + (thread-send collector-thd 'terminate) + (thread-wait collector-thd) + + updated-pkgs) ;; precondition: pkg-name must exist (define (update-checksum force? pkg-name) - (log! "update-checksum ~v ~v" force? pkg-name) + (t:log! "update-checksum ~v ~v" force? pkg-name) (with-handlers ([exn:fail? (λ (x) @@ -79,14 +142,14 @@ (when (or force? (>= (- now last) (* 1 60 60)) (not old-checksum)) - (log! "\tupdating ~a" pkg-name) + (t:log! "\tupdating ~a" pkg-name) (define new-checksum (package-url->checksum (package-ref i 'source) #:pkg-name pkg-name)) (unless (equal? new-checksum old-checksum) - (log! "\told: ~v" old-checksum) - (log! "\tnew: ~v" new-checksum) + (t:log! "\told: ~v" old-checksum) + (t:log! "\tnew: ~v" new-checksum) (set! changed? #t)) (package-begin (define* i @@ -105,8 +168,8 @@ (hash-ref vi 'source "") #:pkg-name pkg-name)) (unless (equal? new-checksum old-checksum) - (log! "\t~a old: ~v" vi old-checksum) - (log! "\t~a new: ~v" vi new-checksum) + (t:log! "\t~a old: ~v" vi old-checksum) + (t:log! "\t~a new: ~v" vi new-checksum) (set! changed? #t)) (values v (hash-set vi 'checksum @@ -130,12 +193,12 @@ (hash-set next-i 'last-updated now))])) (define* i (hash-set i 'checksum-error #f)) - (log! "\twriting with checksum ~v" (hash-ref i 'checksum)) + (t:log! "\twriting with checksum ~v" (hash-ref i 'checksum)) (package-info-set! pkg-name i))) changed?)) (define (update-from-content i) - (log! "\tgetting package content for ~v" (hash-ref i 'name)) + (t:log! "\tgetting package content for ~v" (hash-ref i 'name)) (match-define-values (checksum module-paths (list dependencies implies collection)) (pkg:get-pkg-content