|
19 | 19 | (finally |
20 | 20 | (.setAutoCommit conn# true))))) |
21 | 21 |
|
22 | | -(defn execute! [conn sql] |
| 22 | +(defn execute! [^Connection conn sql] |
23 | 23 | (with-open [stmt (.createStatement conn)] |
24 | 24 | (.execute stmt sql))) |
25 | 25 |
|
26 | | -(defmulti store-impl |
27 | | - (fn [conn opts addr+data-seq] |
28 | | - (:dbtype opts))) |
| 26 | +(defmulti upsert-dml :dbtype) |
29 | 27 |
|
30 | | -(defmethod store-impl :h2 [^Connection conn opts addr+data-seq] |
31 | | - (let [{:keys [table binary? freeze-str freeze-bytes batch-size]} opts |
32 | | - sql (str "merge into " table " key (addr) values (?, ?)")] |
33 | | - (with-tx conn |
34 | | - (with-open [stmt (.prepareStatement conn sql)] |
35 | | - (doseq [part (partition-all batch-size addr+data-seq)] |
36 | | - (doseq [[addr data] part] |
37 | | - (.setLong stmt 1 addr) |
38 | | - (if binary? |
39 | | - (let [content ^bytes (freeze-bytes data)] |
40 | | - (.setBytes stmt 2 content)) |
41 | | - (let [content ^String (freeze-str data)] |
42 | | - (.setString stmt 2 content))) |
43 | | - (.addBatch stmt)) |
44 | | - (.executeBatch stmt)))))) |
| 28 | +(defmethod upsert-dml :h2 [opts] |
| 29 | + (str "merge into " (:table opts) " key (addr) values (?, ?)")) |
| 30 | + |
| 31 | +(defmethod upsert-dml :mysql [opts] |
| 32 | + (str |
| 33 | + "insert into " (:table opts) " (addr, content) " |
| 34 | + "values (?, ?) " |
| 35 | + "ON DUPLICATE KEY UPDATE content = ?")) |
| 36 | + |
| 37 | +(defmethod upsert-dml :default [opts] |
| 38 | + (str |
| 39 | + "insert into " (:table opts) " (addr, content) " |
| 40 | + "values (?, ?) " |
| 41 | + "on conflict(addr) do update set content = ?")) |
45 | 42 |
|
46 | | -(defmethod store-impl :default [^Connection conn opts addr+data-seq] |
| 43 | +(defn store-impl [^Connection conn opts addr+data-seq] |
47 | 44 | (let [{:keys [table binary? freeze-str freeze-bytes batch-size]} opts |
48 | | - sql (str |
49 | | - "insert into " table " (addr, content) " |
50 | | - "values (?, ?) " |
51 | | - "on conflict(addr) do update set content = ?")] |
| 45 | + sql (upsert-dml opts) |
| 46 | + cnt (count (re-seq #"\?" sql))] |
52 | 47 | (with-tx conn |
53 | 48 | (with-open [stmt (.prepareStatement conn sql)] |
54 | 49 | (doseq [part (partition-all batch-size addr+data-seq)] |
|
57 | 52 | (if binary? |
58 | 53 | (let [content ^bytes (freeze-bytes data)] |
59 | 54 | (.setBytes stmt 2 content) |
60 | | - (.setBytes stmt 3 content)) |
| 55 | + (when (= 3 cnt) |
| 56 | + (.setBytes stmt 3 content))) |
61 | 57 | (let [content ^String (freeze-str data)] |
62 | 58 | (.setString stmt 2 content) |
63 | | - (.setString stmt 3 content))) |
| 59 | + (when (= 3 cnt) |
| 60 | + (.setString stmt 3 content)))) |
64 | 61 | (.addBatch stmt)) |
65 | 62 | (.executeBatch stmt)))))) |
66 | 63 |
|
|
108 | 105 | " (addr BIGINT primary key, " |
109 | 106 | " content " (if binary? "BINARY VARYING" "CHARACTER VARYING") ")")) |
110 | 107 |
|
| 108 | +(defmethod ddl :mysql [{:keys [table binary?]}] |
| 109 | + (str |
| 110 | + "create table if not exists " table |
| 111 | + " (addr BIGINT primary key, " |
| 112 | + " content " (if binary? "LONGBLOB" "LONGTEXT") ")")) |
| 113 | + |
111 | 114 | (defmethod ddl :postgresql [{:keys [table binary?]}] |
112 | 115 | (str |
113 | 116 | "create table if not exists " table |
|
0 commit comments