Skip to content

Commit 343508e

Browse files
committed
Android DDP
1 parent 58c686d commit 343508e

File tree

5 files changed

+361
-76
lines changed

5 files changed

+361
-76
lines changed

android/app/src/main/java/chat/rocket/reactnative/notification/Ejson.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public class Ejson {
2525
private static final String TAG = "RocketChat.Ejson";
2626
private static final String TOKEN_KEY = "reactnativemeteor_usertoken-";
2727

28-
String host;
28+
public String host;
2929
String rid;
3030
String type;
3131
Sender sender;
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
package chat.rocket.reactnative.voip
2+
3+
import android.os.Handler
4+
import android.os.Looper
5+
import android.util.Log
6+
import okhttp3.OkHttpClient
7+
import okhttp3.Request
8+
import okhttp3.Response
9+
import okhttp3.WebSocket
10+
import okhttp3.WebSocketListener
11+
import org.json.JSONArray
12+
import org.json.JSONObject
13+
import java.util.concurrent.TimeUnit
14+
15+
/**
16+
* Minimal DDP WebSocket client for listening to Rocket.Chat media-signal events from native Android.
17+
* Only implements the subset needed to detect call hangup: connect, login, subscribe, and ping/pong.
18+
*/
19+
class DDPClient {
20+
21+
companion object {
22+
private const val TAG = "RocketChat.DDPClient"
23+
}
24+
25+
private var webSocket: WebSocket? = null
26+
private var client: OkHttpClient? = null
27+
private var sendCounter = 0
28+
private var isConnected = false
29+
private val mainHandler = Handler(Looper.getMainLooper())
30+
31+
private val pendingCallbacks = mutableMapOf<String, (JSONObject) -> Unit>()
32+
private var connectedCallback: ((Boolean) -> Unit)? = null
33+
34+
var onCollectionMessage: ((JSONObject) -> Unit)? = null
35+
36+
fun connect(host: String, callback: (Boolean) -> Unit) {
37+
val wsUrl = buildWebSocketURL(host)
38+
39+
Log.d(TAG, "Connecting to $wsUrl")
40+
41+
val httpClient = OkHttpClient.Builder()
42+
.pingInterval(30, TimeUnit.SECONDS)
43+
.build()
44+
client = httpClient
45+
46+
val request = Request.Builder().url(wsUrl).build()
47+
48+
webSocket = httpClient.newWebSocket(request, object : WebSocketListener() {
49+
override fun onOpen(webSocket: WebSocket, response: Response) {
50+
Log.d(TAG, "WebSocket opened")
51+
val connectMsg = JSONObject().apply {
52+
put("msg", "connect")
53+
put("version", "1")
54+
put("support", JSONArray().apply {
55+
put("1"); put("pre2"); put("pre1")
56+
})
57+
}
58+
webSocket.send(connectMsg.toString())
59+
waitForConnected(10_000L, callback)
60+
}
61+
62+
override fun onMessage(webSocket: WebSocket, text: String) {
63+
handleMessage(text)
64+
}
65+
66+
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
67+
Log.e(TAG, "WebSocket failure: ${t.message}")
68+
mainHandler.post { callback(false) }
69+
}
70+
71+
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
72+
Log.d(TAG, "WebSocket closed: $code $reason")
73+
}
74+
})
75+
}
76+
77+
fun login(token: String, callback: (Boolean) -> Unit) {
78+
val msg = nextMessage("method").apply {
79+
put("method", "login")
80+
put("params", JSONArray().apply {
81+
put(JSONObject().apply { put("resume", token) })
82+
})
83+
}
84+
85+
val msgId = msg.getString("id")
86+
87+
synchronized(pendingCallbacks) {
88+
pendingCallbacks[msgId] = { data ->
89+
synchronized(pendingCallbacks) { pendingCallbacks.remove(msgId) }
90+
val hasError = data.has("error")
91+
if (hasError) {
92+
Log.e(TAG, "Login failed: ${data.opt("error")}")
93+
} else {
94+
Log.d(TAG, "Login succeeded")
95+
}
96+
mainHandler.post { callback(!hasError) }
97+
}
98+
}
99+
100+
if (!send(msg)) {
101+
mainHandler.post { callback(false) }
102+
}
103+
}
104+
105+
fun subscribe(name: String, params: JSONArray, callback: (Boolean) -> Unit) {
106+
val msg = nextMessage("sub").apply {
107+
put("name", name)
108+
put("params", params)
109+
}
110+
111+
val msgId = msg.getString("id")
112+
113+
synchronized(pendingCallbacks) {
114+
pendingCallbacks[msgId] = {
115+
synchronized(pendingCallbacks) { pendingCallbacks.remove(msgId) }
116+
Log.d(TAG, "Subscribed to $name")
117+
mainHandler.post { callback(true) }
118+
}
119+
}
120+
121+
if (!send(msg)) {
122+
mainHandler.post { callback(false) }
123+
}
124+
}
125+
126+
fun disconnect() {
127+
Log.d(TAG, "Disconnecting")
128+
isConnected = false
129+
synchronized(pendingCallbacks) { pendingCallbacks.clear() }
130+
connectedCallback = null
131+
onCollectionMessage = null
132+
webSocket?.close(1000, null)
133+
webSocket = null
134+
client?.dispatcher?.executorService?.shutdown()
135+
client = null
136+
}
137+
138+
private fun nextMessage(msg: String): JSONObject {
139+
sendCounter++
140+
return JSONObject().apply {
141+
put("msg", msg)
142+
put("id", "ddp-$sendCounter")
143+
}
144+
}
145+
146+
private fun send(json: JSONObject): Boolean {
147+
val ws = webSocket ?: return false
148+
return ws.send(json.toString())
149+
}
150+
151+
private fun waitForConnected(timeoutMs: Long, callback: (Boolean) -> Unit) {
152+
connectedCallback = callback
153+
mainHandler.postDelayed({
154+
val cb = connectedCallback ?: return@postDelayed
155+
connectedCallback = null
156+
Log.e(TAG, "Connect timeout")
157+
cb(false)
158+
}, timeoutMs)
159+
}
160+
161+
private fun handleMessage(text: String) {
162+
val json = try {
163+
JSONObject(text)
164+
} catch (e: Exception) {
165+
return
166+
}
167+
168+
when (json.optString("msg")) {
169+
"connected" -> {
170+
isConnected = true
171+
mainHandler.removeCallbacksAndMessages(null)
172+
val cb = connectedCallback
173+
connectedCallback = null
174+
cb?.let { mainHandler.post { it(true) } }
175+
}
176+
177+
"ping" -> {
178+
send(JSONObject().apply { put("msg", "pong") })
179+
}
180+
181+
"result" -> {
182+
val id = json.optString("id")
183+
val cb = synchronized(pendingCallbacks) { pendingCallbacks[id] }
184+
cb?.invoke(json)
185+
}
186+
187+
"ready" -> {
188+
val subs = json.optJSONArray("subs")
189+
val first = subs?.optString(0)
190+
if (first != null) {
191+
val cb = synchronized(pendingCallbacks) { pendingCallbacks[first] }
192+
cb?.invoke(json)
193+
}
194+
}
195+
196+
"changed", "added", "removed" -> {
197+
onCollectionMessage?.invoke(json)
198+
}
199+
200+
"nosub" -> {
201+
val id = json.optString("id")
202+
val cb = synchronized(pendingCallbacks) { pendingCallbacks[id] }
203+
cb?.invoke(json)
204+
}
205+
206+
else -> {
207+
if (json.has("collection")) {
208+
onCollectionMessage?.invoke(json)
209+
}
210+
}
211+
}
212+
}
213+
214+
private fun buildWebSocketURL(host: String): String {
215+
var normalizedHost = host.trimEnd('/')
216+
217+
val useSsl: Boolean
218+
when {
219+
normalizedHost.startsWith("https://") -> {
220+
useSsl = true
221+
normalizedHost = normalizedHost.removePrefix("https://")
222+
}
223+
normalizedHost.startsWith("http://") -> {
224+
useSsl = false
225+
normalizedHost = normalizedHost.removePrefix("http://")
226+
}
227+
else -> {
228+
useSsl = true
229+
}
230+
}
231+
232+
val scheme = if (useSsl) "wss" else "ws"
233+
return "$scheme://$normalizedHost/websocket"
234+
}
235+
}

android/app/src/main/java/chat/rocket/reactnative/voip/VoipModule.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,9 @@ class VoipModule(reactContext: ReactApplicationContext) : NativeVoipSpec(reactCo
111111
Log.d(TAG, "registerVoipToken called (no-op on Android)")
112112
}
113113

114-
// No-op on Android - native DDP listener is iOS only
115114
override fun stopNativeDDPClient() {
116-
Log.d(TAG, "stopNativeDDPClient called (no-op on Android)")
115+
Log.d(TAG, "stopNativeDDPClient called, stopping native DDP client")
116+
VoipNotification.stopDDPClient()
117117
}
118118

119119
/**

0 commit comments

Comments
 (0)