标签归档:rxscala

Scala中使用Hystrix Command(Non-Blocking)

直接看代码吧

package commands

import javax.inject.Inject

import com.netflix.hystrix.HystrixObservableCommand.Setter
import com.netflix.hystrix._
import play.api.Logger
import play.api.cache.Cache
import play.api.libs.json.{Json, JsValue}
import rx.lang.scala.JavaConversions
import rx.{Subscriber, Observable}
import rx.Observable.OnSubscribe
import scala.collection.JavaConversions._
import scala.concurrent.duration._
import scala.concurrent.{Promise, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import play.api.Play.current

import scala.reflect.ClassTag

/**
  * Created by nealmi on 1/12/16.
  */

object HystrixBridge {

  /**
    * 通过Scala Promise 和 Future 以及 RxScala 桥接 RxJava 以实现no-blocking处理, 方便以Scala的方式使用Hystrix
    *
    * @param groupKey   用来分组,比如所有请求kanche-api的调用使用 kanche-api 分成一组
    * @param commandKey 通常用来描述实际操作,比如:retrieve opened cites
    * @param command    实际处理函数
    * @param fallback   在command()出现异常的时候调用
    * @tparam T 泛型参数
    * @return Future[T]
    */
  def async[T](groupKey: String, commandKey: String, command: () => Future[T], fallback: () => Future[T]) = {
    val setter = Setter
      .withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
      .andCommandKey(HystrixCommandKey.Factory.asKey(commandKey))

    val observable = new HystrixObservableCommand[T](setter) {
      override def construct(): Observable[T] = {
        Observable.create(new OnSubscribe[T] {
          override def call(observer: Subscriber[_ >: T]): Unit = {
            if (!observer.isUnsubscribed()) {
              val future = command()
              future onSuccess {
                case result =>
                  observer.onNext(result)
                  observer.onCompleted()
              }

              future onFailure {
                case t =>
                  observer.onError(t)
              }
            }
          }
        })
      }

      override def resumeWithFallback(): Observable[T] = {
        Observable.create(new OnSubscribe[T] {
          override def call(observer: Subscriber[_ >: T]): Unit = {
            if (!observer.isUnsubscribed()) {
              val future = fallback()
              future onSuccess {
                case result =>
                  observer.onNext(result)
                  observer.onCompleted()
              }

              future onFailure {
                case t =>
                  observer.onError(t)
              }
            }
          }
        })
      }
    }.observe()


    val scalaObservable = JavaConversions.toScalaObservable(observable)

    val promise = Promise[T]

    scalaObservable.subscribe(
      x => promise.success(x),
      e => promise.failure(e),
      () => ()
    )
    promise.future
  }
}