001 (ns org.clojars.punit-naik.sql-batcher
002 (:require [clojure.java.jdbc :as db]
003 [clojure.string :as str]
004 [taoensso.timbre :as log]))
005
006 (defn update-or-delete-op
007 "Checks if the large query is doing an update or delete operation"
008 [query]
009 (if (str/starts-with? query "delete from")
010 :delete :update))
011
012 (defn table-name
013 "Gets the name of the table from the large update/delete query"
014 [query]
015 (let [query-parts (str/split query #"\s+")]
016 (nth query-parts
017 (if (= :delete (update-or-delete-op query))
018 2 1))))
019
020 (defn primary-key-column
021 "Assumes the primary key column to be `<table-name>_id`"
022 [query]
023 (str (table-name query) "_id"))
024
025 (defn remove-trailing-semicolon
026 "Removes semi-colon from the end of a string"
027 [s]
028 (if (str/ends-with? s ";")
029 (str/join (butlast s))
030 s))
031
032 (defn where-clause
033 "Extracts where clause from the large update/delete query"
034 [query]
035 (when-let [where (-> (re-matcher #"(where .* (and|or) .*)"
036 query)
037 re-find
038 first)]
039 (str/replace where #"\s+" " ")))
040
041 (defn join-clause
042 [query]
043 (when-let [join (-> (re-matcher #"(\w* join \w+ on .* \= .*)"
044 query)
045 re-find
046 first)]
047 (-> (str/replace join #" where.*" "")
048 (str/replace #"\s+" " "))))
049
050 (defn build-select-query
051 [query pkey-column]
052 (let [join (join-clause query)
053 where (where-clause query)]
054 (str "select " pkey-column
055 " from " (table-name query)
056 (when join
057 (str " " join))
058 (when where
059 (str " " where))
060 " order by " pkey-column
061 " limit ?")))
062
063 (defn get-pkey-batch
064 [db-spec select-query limit pkey-column]
065 (db/query
066 db-spec
067 [select-query limit]
068 {:row-fn (keyword pkey-column)}))
069
070 (defn where-in-clause
071 [column-name values]
072 (into [(str column-name
073 " in ("
074 (str/join "," (repeat (count values) "?"))
075 ")")]
076 values))
077
078 (def set-clause
079 "Gets the set column names and their values from the update query"
080 (memoize
081 (fn [query]
082 (-> query
083 (str/split #" where ")
084 first
085 (str/split #"set ")
086 second
087 str/trim
088 str/trim-newline
089 (str/split #",")
090 (->> (map
091 (fn [s]
092 (let [[col-name value] (str/split s #"\=")
093 value (->> value str/trim-newline str/trim)]
094 [(->> col-name str/trim-newline str/trim keyword)
095 (if (re-matches #"[0-9]+" value) (read-string value) value)])))
096 (into {}))))))
097
098 (defn execute!
099 "Executes the update/delete query
100 Returns the number of rows processed"
101 [db-spec query pkey-column pkey-batch]
102 (when (seq pkey-batch)
103 (log/info (str "Executing query " query " for a batch of size " (count pkey-batch)))
104 (let [table (keyword (table-name query))
105 where (where-in-clause pkey-column pkey-batch)]
106 (first
107 (if (= :update (update-or-delete-op query))
108 (db/update! db-spec table (set-clause query) where)
109 (db/delete! db-spec table where))))))
110
111 (defn add-extra-where-clauses
112 [query update-map]
113 (reduce (fn [q [col value]]
114 (str/replace
115 q
116 "where"
117 (str "where " (name col) " != " value " and ")))
118 query update-map))
119
120 (defn start-batch-processing!
121 [db-spec query batch-size & [pkey-column select-query-override]]
122 (log/info "Starting the batch processing of large query:" query)
123 (let [pkey-column (str/lower-case (or pkey-column (primary-key-column query)))
124 query (-> query str/lower-case remove-trailing-semicolon)
125 select-query (cond-> (if (seq select-query-override)
126 (-> select-query-override
127 remove-trailing-semicolon
128 str/lower-case
129 (str/replace #"limit [0-9]+" "")
130 (str " limit ?"))
131 (build-select-query query pkey-column))
132 (not= :delete (update-or-delete-op query))
133 (add-extra-where-clauses (set-clause query)))]
134 (loop [total-rows-processed 0
135 finished? false]
136 (if finished?
137 (do (log/info "Total number of rows processed:" total-rows-processed)
138 (log/info "Finished batch processing of query:" query))
139 (let [batch (get-pkey-batch db-spec select-query batch-size pkey-column)
140 batch-count (count batch)]
141 (recur (+ total-rows-processed (or (execute! db-spec query pkey-column batch) 0))
142 (zero? batch-count)))))))