Integrating Angular 2 and SignalR - Part 2 of 2

Update - Nov 10, 2016

This example has now been upgraded to Angular 2.1.2 thanks to a pull request from boulc! Many thanks to boulc for taking the time to update this and share the changes back with me. I've updated the relevant sample code below to reflect this change.


This post is part of a two-part series on integrating Angular 2 and SignalR. If you haven't read the first post, please check it out:

In the first post of this series we walked through how to build a relatively simple API that used SignalR to provide real-time updates to clients calling API methods. In this article we'll tackle the Angular 2 side of things. All of the code shown is available in my github repo.

To set the stage we're going to build a simple Angular 2 app that can invoke the long and short tasks of the API, but also receive the status updates in real-time using SignalR. Visually the application will look like this:

We have a couple components here that let us call the API using a button, and then any status updates related to that task are shown in the textareas. We also have some top-level information shown about the state of the SignalR connection.

The Angular 2 Components

To build this app I tried to keep things as simple as possible. There are only two components used:

  1. The top-level "app" component
  2. A "task" component

components visually

Now, these rely on a single service that exposes the SignalR functionality, but we'll come back to that later on in this article.

The TaskComponent class

Here's what our task component looks like:

import {Component, OnInit, Input} from "@angular/core";  
import {Http, Response} from "@angular/http";

import {ChannelService, ChannelEvent} from "./services/channel.service";

class StatusEvent {  
    State: string;
    PercentComplete: number;
}

@Component({
    selector: 'task',
    template: `
        <div>
            <h4>Task component bound to '{{eventName}}'</h4>
        </div>

        <div class="commands">
            <textarea 
                class="console"
                cols="50" 
                rows="15"
                disabled
                [value]="messages"></textarea> 

            <div class="commands__input">
                <button (click)="callApi()">Call API</button>
            </div>
        </div>
    `
})
export class TaskComponent implements OnInit {  
    @Input() eventName: string;
    @Input() apiUrl: string;

    messages = "";

    private channel = "tasks";

    constructor(
        private http: Http,
        private channelService: ChannelService
    ) {

    }

    ngOnInit() {
        // Get an observable for events emitted on this channel
        //
        this.channelService.sub(this.channel).subscribe(
            (x: ChannelEvent) => {
                switch (x.Name) {
                    case this.eventName: { this.appendStatusUpdate(x); }
                }
            },
            (error: any) => {
                console.warn("Attempt to join channel failed!", error);
            }
        )
    }


    private appendStatusUpdate(ev: ChannelEvent): void {
        // Just prepend this to the messages string shown in the textarea
        //
        let date = new Date();
        switch (ev.Data.State) {
            case "starting": {
                this.messages = `${date.toLocaleTimeString()} : starting\n` + this.messages;
                break;
            }

            case "complete": {
                this.messages = `${date.toLocaleTimeString()} : complete\n` + this.messages;
                break;
            }

            default: {
                this.messages = `${date.toLocaleTimeString()} : ${ev.Data.State} : ${ev.Data.PercentComplete} % complete\n` + this.messages;
            }
        }
    }

    callApi() {
        this.http.get(this.apiUrl)
            .map((res: Response) => res.json())
            .subscribe((message: string) => { console.log(message); });
    }
}

We can see it has a pretty vanilla template, has two @Input() parameters, and is injected with both the Http and ChannelService services. The input parameters drive the behavior of this component like so:

  • eventName - Determines what events are shown in the status window
  • apiUrl - Determines what endpoint is called when the button is clicked

We also have code within ngOnInit() to react to events emitted by the channel service, but we'll come back to that later.

The AppComponent

The main application component is pretty simple, and is used mostly to start the SignalR connection, and to of course contain the child components. The code looks like this:

import {Component, OnInit} from '@angular/core';  
import {Observable} from "rxjs/Observable";

import {ChannelService, ConnectionState} from "./services/channel.service";

@Component({
    selector: 'my-app',
    template: `
        <div>
            <h3>SignalR w/ Angular 2 demo</h3>
        </div>

        <div>
            <span>Connection state: {{connectionState$ | async}}</span>
        </div>

        <div class="flex-row">
            <task class="flex"
                [eventName]="'longTask.status'"
                [apiUrl]="'http://localhost:9123/tasks/long'"></task>
            <task class="flex"
                [eventName]="'shortTask.status'"
                [apiUrl]="'http://localhost:9123/tasks/short'"></task>
        </div>
     `
})
export class AppComponent implements OnInit {

    // An internal "copy" of the connection state stream used because
    //  we want to map the values of the original stream. If we didn't 
    //  need to do that then we could use the service's observable 
    //  right in the template.
    //   
    connectionState$: Observable<string>;

    constructor(
        private channelService: ChannelService
    ) {

        // Let's wire up to the signalr observables
        //
        this.connectionState$ = this.channelService.connectionState$
            .map((state: ConnectionState) => { return ConnectionState[state]; });

        this.channelService.error$.subscribe(
            (error: any) => { console.warn(error); },
            (error: any) => { console.error("errors$ error", error); }
        );

        // Wire up a handler for the starting$ observable to log the
        //  success/fail result
        //
        this.channelService.starting$.subscribe(
            () => { console.log("signalr service has been started"); },
            () => { console.warn("signalr service failed to start!"); }
        );
    }

    ngOnInit() {
        // Start the connection up!
        //
        console.log("Starting the channel service");

        this.channelService.start();
    }
}

We can see in the template we are using two instances of the TaskComponent, and providing them different inputs. We can also see in the template that we're subscribing to a stream that provides updates about the connection state of the channel service. We'll describe this more in a bit.

Outside of the template this component mostly subscribes to observables provided by the channel service, and inside of ngOnInit() it starts the channel service.

Bootstrapping

With Angular 2 you need to define the module for the application, and here's the code in this example:

import { NgModule }       from '@angular/core';  
import { BrowserModule }  from '@angular/platform-browser';  
import { HttpModule }     from '@angular/http';

import "rxjs/add/operator/map";

import {ChannelService, ChannelConfig, SignalrWindow} from "./services/channel.service";

import { AppComponent }   from './app.component';  
import { TaskComponent }  from "./task.component";

let channelConfig = new ChannelConfig();  
channelConfig.url = "http://localhost:9123/signalr";  
channelConfig.hubName = "EventHub";

@NgModule({
  imports:      [ BrowserModule, HttpModule ],
  declarations: [ AppComponent, TaskComponent ],
  providers: [
    ChannelService,
    { provide: SignalrWindow, useValue: window },
    { provide: 'channel.config', useValue: channelConfig }
  ],
  bootstrap:    [ AppComponent ]
})

export class AppModule { }  

Here we're pulling in the services we'll need, but also providing two token-based providers. One provides a type called SignalrWindow that is used to call the SignalR methods within our ChannelService class without relying on global variables. The other drives the behavior of the ChannelService itself.

The ChannelService

ok, so here's the meat of this example. The ChannelService is meant to wrap the internals of how SignalR works, and expose it using the channel/event model discussed in the first part of this series.

Publicly it exposes the following items:

  • starting$ - An observable that emits an event when the connection is first established
  • connectionState$ - An observable for changes in the state of the SignalR connection
  • error$ - An observable of errors that can occur within SignalR
  • start() - A function to start the connection
  • sub(channel: string) - A function to subscribe to events on a specific channel. This returns an observable that the caller can subscribe to
  • publish(ev: ChannelEvent) - A function to push out events on a given channel (this isn't used in this particular demo though)

<caveat> I am in no way an expert in observables, and am trying really hard to better understand them. If you see something really dumb please tell me in the comments! </caveat>

With that, here's the code for the ChannelService:

import {Injectable, Inject} from "@angular/core";  
import {Subject} from "rxjs/Subject";  
import {Observable} from "rxjs/Observable";

/**
 * When SignalR runs it will add functions to the global $ variable 
 * that you use to create connections to the hub. However, in this
 * class we won't want to depend on any global variables, so this
 * class provides an abstraction away from using $ directly in here.
 */
export class SignalrWindow extends Window {  
    $: any;
}

export enum ConnectionState {  
    Connecting = 1,
    Connected = 2,
    Reconnecting = 3,
    Disconnected = 4
}

export class ChannelConfig {  
    url: string;
    hubName: string;
    channel: string;
}

export class ChannelEvent {  
    Name: string;
    ChannelName: string;
    Timestamp: Date;
    Data: any;
    Json: string;

    constructor() {
        this.Timestamp = new Date();
    }
}

class ChannelSubject {  
    channel: string;
    subject: Subject<ChannelEvent>;
}

/**
 * ChannelService is a wrapper around the functionality that SignalR
 * provides to expose the ideas of channels and events. With this service
 * you can subscribe to specific channels (or groups in signalr speak) and
 * use observables to react to specific events sent out on those channels.
 */
@Injectable()
export class ChannelService {

    /**
     * starting$ is an observable available to know if the signalr 
     * connection is ready or not. On a successful connection this
     * stream will emit a value.
     */
    starting$: Observable<any>;

    /**
     * connectionState$ provides the current state of the underlying
     * connection as an observable stream.
     */
    connectionState$: Observable<ConnectionState>;

    /**
     * error$ provides a stream of any error messages that occur on the 
     * SignalR connection
     */
    error$: Observable<string>;

    // These are used to feed the public observables 
    //
    private connectionStateSubject = new Subject<ConnectionState>();
    private startingSubject = new Subject<any>();
    private errorSubject = new Subject<any>();

    // These are used to track the internal SignalR state 
    //
    private hubConnection: any;
    private hubProxy: any;

    // An internal array to track what channel subscriptions exist 
    //
    private subjects = new Array<ChannelSubject>();

    constructor(
        @Inject(SignalrWindow) private window: SignalrWindow,
        @Inject("channel.config") private channelConfig: ChannelConfig
    ) {
        if (this.window.$ === undefined || this.window.$.hubConnection === undefined) {
            throw new Error("The variable '$' or the .hubConnection() function are not defined...please check the SignalR scripts have been loaded properly");
        }

        // Set up our observables
        //
        this.connectionState$ = this.connectionStateSubject.asObservable();
        this.error$ = this.errorSubject.asObservable();
        this.starting$ = this.startingSubject.asObservable();

        this.hubConnection = this.window.$.hubConnection();
        this.hubConnection.url = channelConfig.url;
        this.hubProxy = this.hubConnection.createHubProxy(channelConfig.hubName);

        // Define handlers for the connection state events
        //
        this.hubConnection.stateChanged((state: any) => {
            let newState = ConnectionState.Connecting;

            switch (state.newState) {
                case this.window.$.signalR.connectionState.connecting:
                    newState = ConnectionState.Connecting;
                    break;
                case this.window.$.signalR.connectionState.connected:
                    newState = ConnectionState.Connected;
                    break;
                case this.window.$.signalR.connectionState.reconnecting:
                    newState = ConnectionState.Reconnecting;
                    break;
                case this.window.$.signalR.connectionState.disconnected:
                    newState = ConnectionState.Disconnected;
                    break;
            }

            // Push the new state on our subject
            //
            this.connectionStateSubject.next(newState);
        });

        // Define handlers for any errors
        //
        this.hubConnection.error((error: any) => {
            // Push the error on our subject
            //
            this.errorSubject.next(error);
        });

        this.hubProxy.on("onEvent", (channel: string, ev: ChannelEvent) => {
            //console.log(`onEvent - ${channel} channel`, ev);

            // This method acts like a broker for incoming messages. We 
            //  check the interal array of subjects to see if one exists
            //  for the channel this came in on, and then emit the event
            //  on it. Otherwise we ignore the message.
            //
            let channelSub = this.subjects.find((x: ChannelSubject) => {
                return x.channel === channel;
            }) as ChannelSubject;

            // If we found a subject then emit the event on it
            //
            if (channelSub !== undefined) {
                return channelSub.subject.next(ev);
            }
        });

    }

    /**
     * Start the SignalR connection. The starting$ stream will emit an 
     * event if the connection is established, otherwise it will emit an
     * error.
     */
    start(): void {
        // Now we only want the connection started once, so we have a special
        //  starting$ observable that clients can subscribe to know know if
        //  if the startup sequence is done.
        //
        // If we just mapped the start() promise to an observable, then any time
        //  a client subscried to it the start sequence would be triggered
        //  again since it's a cold observable.
        //
        this.hubConnection.start()
            .done(() => {
                this.startingSubject.next();
            })
            .fail((error: any) => {
                this.startingSubject.error(error);
            });
    }

    /** 
     * Get an observable that will contain the data associated with a specific 
     * channel 
     * */
    sub(channel: string): Observable<ChannelEvent> {

        // Try to find an observable that we already created for the requested 
        //  channel
        //
        let channelSub = this.subjects.find((x: ChannelSubject) => {
            return x.channel === channel;
        }) as ChannelSubject;

        // If we already have one for this event, then just return it
        //
        if (channelSub !== undefined) {
            console.log(`Found existing observable for ${channel} channel`)
            return channelSub.subject.asObservable();
        }

        //
        // If we're here then we don't already have the observable to provide the
        //  caller, so we need to call the server method to join the channel 
        //  and then create an observable that the caller can use to received
        //  messages.
        //

        // Now we just create our internal object so we can track this subject
        //  in case someone else wants it too
        //
        channelSub = new ChannelSubject();
        channelSub.channel = channel;
        channelSub.subject = new Subject<ChannelEvent>();
        this.subjects.push(channelSub);

        // Now SignalR is asynchronous, so we need to ensure the connection is
        //  established before we call any server methods. So we'll subscribe to 
        //  the starting$ stream since that won't emit a value until the connection
        //  is ready
        //
        this.starting$.subscribe(() => {
            this.hubProxy.invoke("Subscribe", channel)
                .done(() => {
                    console.log(`Successfully subscribed to ${channel} channel`);
                })
                .fail((error: any) => {
                    channelSub.subject.error(error);
                });
        },
            (error: any) => {
                channelSub.subject.error(error);
            });

        return channelSub.subject.asObservable();
    }

    // Not quite sure how to handle this (if at all) since there could be
    //  more than 1 caller subscribed to an observable we created
    //
    // unsubscribe(channel: string): Rx.Observable<any> {
    //     this.observables = this.observables.filter((x: ChannelObservable) => {
    //         return x.channel === channel;
    //     });
    // }

    /** publish provides a way for calles to emit events on any channel. In a 
     * production app the server would ensure that only authorized clients can
     * actually emit the message, but here we're not concerned about that.
     */
    publish(ev: ChannelEvent): void {
        this.hubProxy.invoke("Publish", ev);
    }

}

So there's a fair bit of code, but all this really does is create the SignalR connection, and attempt to map what it provides to observables that users of the service can observe. The sub() function works by creating new Subjects (internally) that are used to broker events sent by the SignalR hub.

The idea is that a user of the service just calls sub("myChannel"), and they are provided an observable they can subscribe to to get any events sent on that channel. They then need to decide what to do with any events emitted on the observable. With this knowledge we can return to our TaskComponent:

    ngOnInit() {
        // Get an observable for events emitted on this channel
        //
        this.channelService.sub(this.channel).subscribe(
            (x: ChannelEvent) => {
                switch (x.Name) {
                    case this.eventName: { this.appendStatusUpdate(x); }
                }
            },
            (error: any) => {
                console.warn("Attempt to join channel failed!", error);
            }
        )
    }

We can now see that in our task component we're calling sub(this.channel) and immediately subscribing to the observable we get back. In our subscription we simply check the name of the event provided and append it to the message window if it matches the event the task component was configured for.

Pulling in SignalR itself

SignalR's "magic" is provided by some javascript files provided by Microsoft, so we need to ensure they're included in index.html

<!DOCTYPE html>  
<html>  
  <head>
     <!-- omitted for brevity -->

    <!-- Include SignalR (& jQuery since it requires it) -->
    <script src="node_modules/jquery/dist/jquery.js"></script>
    <script src="node_modules/signalr/jquery.signalr.js"></script>

     <!-- omitted for brevity -->
</html>  
Wrapping up

In this two-part series I explored how to create a simple API that used SignalR to provide real-time updates on long running tasks. I then created an Angular 2 client to call the API methods and process the updates using SignalR.

I am still very new to Angular 2, and in particular how to properly use RxJS, so if you have any feedback or suggestions on how this could be improved please let me know in the comments.

comments powered by Disqus