# WebSockets

# Mapping

Um festzulegen, welche WebSockets unter welchen Pfaden bereitgestellt werden, wird eine @Configuration Klasse mit einem @Bean vom Typ HandlerMapping (hier der Subtyp SimpleUrlHandlerMapping) angelegt. Diese wird zur Laufzeit automatisch von Spring gefunden. Dort werden in einer Map zu Pfaden die jeweiligen WebSocket-Handler zugeordnet. Die WebSocket-Handler sind ebenfalls Beans und werden daher im Konstruktor angefragt, wo Spring sie automatisch übergibt.

@Configuration
class WebSocketConfig(
  private val echoWebSocket: EchoWebSocket
) {
  @Bean
  fun webSocketHandlers() = SimpleUrlHandlerMapping(
    mapOf(
      "/echo-socket" to echoWebSocket
    ),
    1
  )
}

Die 1 ist der Wert für den order Parameter dieses HandlerMappings, welcher standardmäßig auf Ordered.LOWEST_PRECEDENCE (entspricht Integer.MAX_VALUE) steht. Kleinere Werte werden bevorzugt behandelt. Der von Spring bereitgestellte Handler für statische Ressourcen reagiert standardmäßig auf /** (also alles) und hat sein order auf Ordered.LOWEST_PRECEDENCE - 1 gesetzt, würde so also einen Aufruf von /echo-socket wegfangen. Daher setzen wir order von unserem Handler auf irgendeinen kleineren Wert, damit unser Handler zuerst gefragt wird.

# Handler

Ein WebSocket-Handler ist eine @Component Klasse, die das WebSocketHandler Interface implementiert. Es ist lediglich die Methode handle mit Leben zu befüllen. Diese wird aufgerufen, sobald eine WebSocket Verbindung aufgebaut wurde. Die Verbindung wird automatisch geschlossen, wenn die handle Methode fertig ist – das erfordert ein wenig Umdenken zum reaktiven Programmierstil.

@Component
class EchoWebSocket : WebSocketHandler {
  override fun handle(session: WebSocketSession): Mono<Void> { /* ... */ }
}

session stellt die Methoden textMessage(), binaryMessage(), send(), receive() und close() zur Verfügung. Bis auf textMessage() und binaryMessage() arbeiten die Methoden mit reaktiven Datentypen wie Mono (quasi eine Promise) und Flux/Publisher (im Prinzip ein lesender / schreibender Stream).

Von session.receive() erhält man ein Flux<WebSocketMessage>, also einen Stream, der eingehende Nachrichten enthält. Dieser wird terminiert, sobald die WebSocket-Verbindung geschlossen wurde, egal von welcher Seite. Analog dazu gibt man an session.send() einen Stream, der ausgehende Nachrichten produziert (einen Publisher<WebSocketMessage>).

Diese beiden Sachen kann man kombinieren: Man übergibt send() einen Stream, der von dem receive() Stream abgeleitet ist. Abgeleitet bedeutet hier, dass zu einer eingehenden Nachricht eine ausgehende Nachricht (oder mehrere, oder keine) produziert wird – man sagt auch, der Stream wird gemappt.

Da send() ein Mono zurückgibt, welches fertig ist, sobald alle zu sendenden Nachrichten gesendet wurden, die aber endlos lange produziert werden (solange der Stream von eingehenden Nachrichten offen bleibt und neue Nachrichten liefert, auf die mit ausgehenden Nachrichten reagiert wird), ist dieses Mono auch gleichzeitig der Rückgabewert von handle(). Das bedeutet, handle() ist fertig, sobald der ausgehende Stream fertig ist, der wiederum fertig ist, sobald der eingehende Stream fertig ist. Fertig.

Die WebSocket-Session startet mit Aufruf von handle(). Um mitzubekommen, wann sie endet, kann der eingehende Stream mit doAfterTerminate() beobachtet werden.

Ein Beispiel der besprochenen Funktionen sieht so aus:

@Component
class EchoWebSocket : WebSocketHandler {
  override fun handle(session: WebSocketSession): Mono<Void> {
    logger.info { "WebSocket#${session.id} connected" }

    // das Senden starten und auf die Beendigung dessen warten lassen
    return session.send(
      // das Empfangen starten
      session.receive()
        .doAfterTerminate {
          // WebSocket wurde geschlossen, noch auf den CloseStatus warten und diesen ausgeben
          session.closeStatus().subscribe { status ->
            logger.info { "WebSocket#${session.id} closed ($status)" }
          }
        }
        // zu eingehenden Nachrichten die ausgehenden Nachrichten produzieren
        .flatMap {
          // eine Nachricht wurde empfangen, wir gehen von Text aus
          val text = it.payloadAsText
          logger.info { "WebSocket#${session.id} message: $text" }

          // schauen, was genau geschickt wurde
          // und entsprechenden Stream von ausgehenden Nachrichten zurückgeben
          when (text) {
            "ignore" -> {
              // keine ausgehende Nachricht produzieren
              Flux.empty()
            }
            "multi" -> {
              // mehrere ausgehende Nachrichten produzieren
              Flux.just(
                session.textMessage("first"),
                session.textMessage("second")
              )
            }
            else -> {
              // eine ausgehende Nachrichten produzieren
              Flux.just(session.textMessage("hello, you said: $text"))
            }
          }
        })
  }
}