001  (ns org.clojars.punit-naik.sql-batcher
002    (:require [clojure.java.jdbc :as db]
003              [clojure.string :as str]
004              [taoensso.timbre :as log]))
005  
006  (defn update-or-delete-op
007    "Checks if the large query is doing an update or delete operation"
008    [query]
009    (if (str/starts-with? query "delete from")
010      :delete :update))
011  
012  (defn table-name
013    "Gets the name of the table from the large update/delete query"
014    [query]
015    (let [query-parts (str/split query #"\s+")]
016      (nth query-parts
017           (if (= :delete (update-or-delete-op query))
018             2 1))))
019  
020  (defn primary-key-column
021    "Assumes the primary key column to be `<table-name>_id`"
022    [query]
023    (str (table-name query) "_id"))
024  
025  (defn remove-trailing-semicolon
026    "Removes semi-colon from the end of a string"
027    [s]
028    (if (str/ends-with? s ";")
029      (str/join (butlast s))
030      s))
031  
032  (defn where-clause
033    "Extracts where clause from the large update/delete query"
034    [query]
035    (when-let [where (-> (re-matcher #"(where .* (and|or) .*)"
036                                     query)
037                         re-find
038                         first)]
039      (str/replace where #"\s+" " ")))
040  
041  (defn join-clause
042    [query]
043    (when-let [join (-> (re-matcher #"(\w* join \w+ on .* \= .*)"
044                                    query)
045                        re-find
046                        first)]
047      (-> (str/replace join #" where.*" "")
048          (str/replace #"\s+" " "))))
049  
050  (defn build-select-query
051    [query pkey-column]
052    (let [join (join-clause query)
053          where (where-clause query)]
054      (str "select " pkey-column
055           " from " (table-name query)
056           (when join
057             (str " " join))
058           (when where
059             (str " " where))
060           " order by " pkey-column
061           " limit ?")))
062  
063  (defn get-pkey-batch
064    [db-spec select-query limit pkey-column]
065    (db/query
066     db-spec
067     [select-query limit]
068     {:row-fn (keyword pkey-column)}))
069  
070  (defn where-in-clause
071    [column-name values]
072    (into [(str column-name
073                " in ("
074                (str/join "," (repeat (count values) "?"))
075                ")")]
076          values))
077  
078  (def set-clause
079    "Gets the set column names and their values from the update query"
080    (memoize
081     (fn [query]
082       (-> query
083           (str/split #" where ")
084           first
085           (str/split #"set ")
086           second
087           str/trim
088           str/trim-newline
089           (str/split #",")
090           (->> (map
091                 (fn [s]
092                   (let [[col-name value] (str/split s #"\=")
093                         value (->> value str/trim-newline str/trim)]
094                     [(->> col-name str/trim-newline str/trim keyword)
095                      (if (re-matches #"[0-9]+" value) (read-string value) value)])))
096                (into {}))))))
097  
098  (defn execute!
099    "Executes the update/delete query
100     Returns the number of rows processed"
101    [db-spec query pkey-column pkey-batch]
102    (when (seq pkey-batch)
103      (log/info (str "Executing query " query " for a batch of size " (count pkey-batch)))
104      (let [table (keyword (table-name query))
105            where (where-in-clause pkey-column pkey-batch)]
106        (first
107         (if (= :update (update-or-delete-op query))
108           (db/update! db-spec table (set-clause query) where)
109           (db/delete! db-spec table where))))))
110  
111  (defn add-extra-where-clauses
112    [query update-map]
113    (reduce (fn [q [col value]]
114              (str/replace
115               q
116               "where"
117               (str "where " (name col) " != " value " and ")))
118            query update-map))
119  
120  (defn start-batch-processing!
121    [db-spec query batch-size & [pkey-column select-query-override]]
122    (log/info "Starting the batch processing of large query:" query)
123    (let [pkey-column (str/lower-case (or pkey-column (primary-key-column query)))
124          query (-> query str/lower-case remove-trailing-semicolon)
125          select-query (cond-> (if (seq select-query-override)
126                                 (-> select-query-override
127                                     remove-trailing-semicolon
128                                     str/lower-case
129                                     (str/replace #"limit [0-9]+" "")
130                                     (str " limit ?"))
131                                 (build-select-query query pkey-column))
132                         (not= :delete (update-or-delete-op query))
133                         (add-extra-where-clauses (set-clause query)))]
134      (loop [total-rows-processed 0
135             finished? false]
136        (if finished?
137          (do (log/info "Total number of rows processed:" total-rows-processed)
138              (log/info "Finished batch processing of query:" query))
139          (let [batch (get-pkey-batch db-spec select-query batch-size pkey-column)
140                batch-count (count batch)]
141            (recur (+ total-rows-processed (or (execute! db-spec query pkey-column batch) 0))
142                   (zero? batch-count)))))))