Akka(33): Http:Marshalling,to Json

  Akka-http是一项系统集成工具。这主要依赖系统之间的数据交换功能。因为程序内数据表达形式与网上传输的数据格式是不相同的,所以需要对程序高级结构化的数据进行转换(marshalling or serializing)成为可在网上传输的数据格式。由于可能涉及到异类系统集成,网上传输数据格式是一个公开的标准,这样大家才都可以进行解析。Json就是是一个目前业界普遍接受的网上交换数据格式。当然,所谓的数据格式转换应该是双向的,还需要包括把接收的网上传输数据转换成程序高级结构化数据。


sealed abstract class Marshaller[-A, +B] { def apply(value: A)(implicit ec: ExecutionContext): Future[List[Marshalling[B]]] def map[C](f: B ⇒ C): Marshaller[A, C] = Marshaller(implicit ec ⇒ value ⇒ this(value).fast map (_ map (_ map f))) ... } //#marshaller-creation object Marshaller extends GenericMarshallers with PredefinedToEntityMarshallers with PredefinedToResponseMarshallers with PredefinedToRequestMarshallers { /** * Creates a [[Marshaller]] from the given function. */ def apply[A, B](f: ExecutionContext ⇒ A ⇒ Future[List[Marshalling[B]]]): Marshaller[A, B] = new Marshaller[A, B] { def apply(value: A)(implicit ec: ExecutionContext) = try f(ec)(value) catch { case NonFatal(e) ⇒ FastFuture.failed(e) } } ... } 构建函数apply[A,B]包嵌了个操作函数:A=>Future[List[Marshalling[B]]],至于为什么不采用更简单直接的方式A=>B是因为:




/** * Describes one possible option for marshalling a given value. */ sealed trait Marshalling[+A] { def map[B](f: A ⇒ B): Marshalling[B] /** * Converts this marshalling to an opaque marshalling, i.e. a marshalling result that * does not take part in content type negotiation. The given charset is used if this * instance is a `WithOpenCharset` marshalling. */ def toOpaque(charset: HttpCharset): Marshalling[A] } object Marshalling { /** * A Marshalling to a specific [[akka.http.scaladsl.model.ContentType]]. */ final case class WithFixedContentType[A]( contentType: ContentType, marshal: () ⇒ A) extends Marshalling[A] { def map[B](f: A ⇒ B): WithFixedContentType[B] = copy(marshal = () ⇒ f(marshal())) def toOpaque(charset: HttpCharset): Marshalling[A] = Opaque(marshal) } /** * A Marshalling to a specific [[akka.http.scaladsl.model.MediaType]] with a flexible charset. */ final case class WithOpenCharset[A]( mediaType: MediaType.WithOpenCharset, marshal: HttpCharset ⇒ A) extends Marshalling[A] { def map[B](f: A ⇒ B): WithOpenCharset[B] = copy(marshal = cs ⇒ f(marshal(cs))) def toOpaque(charset: HttpCharset): Marshalling[A] = Opaque(() ⇒ marshal(charset)) } /** * A Marshalling to an unknown MediaType and charset. * Circumvents content negotiation. */ final case class Opaque[A](marshal: () ⇒ A) extends Marshalling[A] { def map[B](f: A ⇒ B): Opaque[B] = copy(marshal = () ⇒ f(marshal())) def toOpaque(charset: HttpCharset): Marshalling[A] = this } } 我们可以在Marshalling类型里对消息内容类型(message-content-type)进行操作。为了方便操作,Akka-http提供了下面这几个类型别名:

type ToEntityMarshaller[T] = Marshaller[T, MessageEntity] type ToByteStringMarshaller[T] = Marshaller[T, ByteString] type ToHeadersAndEntityMarshaller[T] = Marshaller[T, (immutable.Seq[HttpHeader], MessageEntity)] type ToResponseMarshaller[T] = Marshaller[T, HttpResponse] type ToRequestMarshaller[T] = Marshaller[T, HttpRequest] 基本上是以目标数据类型来分类代表的。Akka-http提供了许多类型的预设实例到Mashalling转换:

PredefinedToEntityMarshallers Array[Byte] ByteString Array[Char] String akka.http.scaladsl.model.FormData akka.http.scaladsl.model.MessageEntity T <: akka.http.scaladsl.model.Multipart PredefinedToResponseMarshallers T, if a ToEntityMarshaller[T] is available HttpResponse StatusCode (StatusCode, T), if a ToEntityMarshaller[T] is available (Int, T), if a ToEntityMarshaller[T] is available (StatusCode, immutable.Seq[HttpHeader], T), if a ToEntityMarshaller[T] is available (Int, immutable.Seq[HttpHeader], T), if a ToEntityMarshaller[T] is available PredefinedToRequestMarshallers HttpRequest Uri (HttpMethod, Uri, T), if a ToEntityMarshaller[T] is available (HttpMethod, Uri, immutable.Seq[HttpHeader], T), if a ToEntityMarshaller[T] is available GenericMarshallers Marshaller[Throwable, T] Marshaller[Option[A], B], if a Marshaller[A, B] and an EmptyValue[B] is available Marshaller[Either[A1, A2], B], if a Marshaller[A1, B] and a Marshaller[A2, B] is available Marshaller[Future[A], B], if a Marshaller[A, B] is available Marshaller[Try[A], B], if a Marshaller[A, B] is available


class Marshal[A](val value: A) { /** * Marshals `value` using the first available [[Marshalling]] for `A` and `B` provided by the given [[Marshaller]]. * If the marshalling is flexible with regard to the used charset `UTF-8` is chosen. */ def to[B](implicit m: Marshaller[A, B], ec: ExecutionContext): Future[B] = m(value).fast.map { _.head match { case Marshalling.WithFixedContentType(_, marshal) ⇒ marshal() case Marshalling.WithOpenCharset(_, marshal) ⇒ marshal(HttpCharsets.`UTF-8`) case Marshalling.Opaque(marshal) ⇒ marshal() } } /** * Marshals `value` to an `HttpResponse` for the given `HttpRequest` with full content-negotiation. */ def toResponseFor(request: HttpRequest)(implicit m: ToResponseMarshaller[A], ec: ExecutionContext): Future[HttpResponse] = { import akka.http.scaladsl.marshalling.Marshal._ val ctn = ContentNegotiator(request.headers) m(value).fast.map { marshallings ⇒ val supportedAlternatives: List[ContentNegotiator.Alternative] = marshallings.collect { case Marshalling.WithFixedContentType(ct, _) ⇒ ContentNegotiator.Alternative(ct) case Marshalling.WithOpenCharset(mt, _) ⇒ ContentNegotiator.Alternative(mt) }(collection.breakOut) val bestMarshal = { if (supportedAlternatives.nonEmpty) { ctn.pickContentType(supportedAlternatives).flatMap { case best @ (_: ContentType.Binary | _: ContentType.WithFixedCharset | _: ContentType.WithMissingCharset) ⇒ marshallings collectFirst { case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshal } case best @ ContentType.WithCharset(bestMT, bestCS) ⇒ marshallings collectFirst { case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshal case Marshalling.WithOpenCharset(`bestMT`, marshal) ⇒ () ⇒ marshal(bestCS) } } } else None } orElse { marshallings collectFirst { case Marshalling.Opaque(marshal) ⇒ marshal } } getOrElse { throw UnacceptableResponseContentTypeException(supportedAlternatives.toSet) } bestMarshal() } } } 我们可以用Marshal.to和toResponseFor(request)把Akka-http提供的预设可转换类实例转换成相关的toResponseMarshallable类实例。因为Server-Directive如complete接受一个toResponseMarshallable来构建HttpResponse:

/** * Completes the request using the given arguments. * * @group route */ def complete(m: ⇒ ToResponseMarshallable): StandardRoute = StandardRoute(_.complete(m))


/** Something that can later be marshalled into a response */ trait ToResponseMarshallable { type T def value: T implicit def marshaller: ToResponseMarshaller[T] def apply(request: HttpRequest)(implicit ec: ExecutionContext): Future[HttpResponse] = Marshal(value).toResponseFor(request) } object ToResponseMarshallable { implicit def apply[A](_value: A)(implicit _marshaller: ToResponseMarshaller[A]): ToResponseMarshallable = new ToResponseMarshallable { type T = A def value: T = _value def marshaller: ToResponseMarshaller[T] = _marshaller } implicit val marshaller: ToResponseMarshaller[ToResponseMarshallable] = Marshaller { implicit ec ⇒ marshallable ⇒ marshallable.marshaller(marshallable.value) } } 只要在可视域内(implicit scope)能发现Marshaller[A,B]的隐式实例就能满足complete入参要求了。下面是一些Marshal用例:

import akka.util.ByteString import akka.http.scaladsl.model.{HttpResponse, MessageEntity} import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ object Marshalling { val string = "Yeah" val entityFuture = Marshal(string).to[MessageEntity] val errorMsg = "Easy, pal!" val responseFuture = Marshal(420 -> errorMsg).to[HttpResponse] val request = HttpRequest(headers = List(headers.Accept(MediaTypes.`application/json`))) val responseText = "Plaintext" val respFuture = Marshal(responseText).toResponseFor(request) // with content negotiation! val bsFuture = Marshal("oh my!").to[ByteString] val reqFuture = Marshal("can you?").to[HttpRequest] val resp = reqFuture.flatMap {r => Marshal("ok").toResponseFor(r)} } 那么对于那些自定义的类型U,由于不可能有预设定对应的Marshaller[U,B],应该怎么办?如简单的case class:

case class User(id: Int, name: String) case class Item(id: Int, name: String, price: Double) val john = Marshal(User(1,"John")).to[MessageEntity] val fruit = Marshal(Item(1,"banana", 3.5)).to[MessageEntity] val route = get { path("items") { complete(fruit) } ~ path("users") { complete(john) } }


/** * A special JsonFormat signaling that the format produces a legal JSON root object, i.e. either a JSON array * or a JSON object. */ trait RootJsonFormat[T] extends JsonFormat[T] with RootJsonReader[T] with RootJsonWriter[T] RootJsonFormat[T]代表T类型实例的Json转换。RootJsonFormat[T]的继承父辈包括:

/** * Provides the JSON deserialization and serialization for type T. */ trait JsonFormat[T] extends JsonReader[T] with JsonWriter[T] /** * A special JsonReader capable of reading a legal JSON root object, i.e. either a JSON array or a JSON object. */ @implicitNotFound(msg = "Cannot find RootJsonReader or RootJsonFormat type class for ${T}") trait RootJsonReader[T] extends JsonReader[T] /** * A special JsonWriter capable of writing a legal JSON root object, i.e. either a JSON array or a JSON object. */ @implicitNotFound(msg = "Cannot find RootJsonWriter or RootJsonFormat type class for ${T}") trait RootJsonWriter[T] extends JsonWriter[T] 它们又继承了具体的Json读写工具类:

/** * Provides the JSON deserialization for type T. */ @implicitNotFound(msg = "Cannot find JsonReader or JsonFormat type class for ${T}") trait JsonReader[T] { def read(json: JsValue): T } object JsonReader { implicit def func2Reader[T](f: JsValue => T): JsonReader[T] = new JsonReader[T] { def read(json: JsValue) = f(json) } } /** * Provides the JSON serialization for type T. */ @implicitNotFound(msg = "Cannot find JsonWriter or JsonFormat type class for ${T}") trait JsonWriter[T] { def write(obj: T): JsValue } object JsonWriter { implicit def func2Writer[T](f: T => JsValue): JsonWriter[T] = new JsonWriter[T] { def write(obj: T) = f(obj) } } 它们提供了函数JsValue=>T到JsonReader[T]及T=>JsValue到JsonWriter直接的隐式转换。Akka-http的Json解决方案是典型的type-class模式:是一种可以即兴创建功能的类型继承模式(add-hoc polymorphism)。它的特征就是在可视域内(implicit scope)应不同功能要求提供不同的功能实现类型的隐式实例(implicit instance)。具体用例如下:

trait Formats extends SprayJsonSupport with DefaultJsonProtocol object Converters extends Formats { case class User(id: Int, name: String) case class Item(id: Int, name: String, price: Double) implicit val itemFormat = jsonFormat3(Item.apply) implicit val userFormat = jsonFormat2(User.apply) } jsonFormatXX是Spray-Json提供的Json读写实现。我们把这个隐式实例置于当前可视域内即完成了与Akka-http的对接。我们来看看JsonFormat的定义:

trait ProductFormatsInstances { self: ProductFormats with StandardFormats => // Case classes with 1 parameters def jsonFormat1[P1 :JF, T <: Product :ClassManifest](construct: (P1) => T): RootJsonFormat[T] = { val Array(p1) = extractFieldNames(classManifest[T]) jsonFormat(construct, p1) } def jsonFormat[P1 :JF, T <: Product](construct: (P1) => T, fieldName1: String): RootJsonFormat[T] = new RootJsonFormat[T]{ def write(p: T) = { val fields = new collection.mutable.ListBuffer[(String, JsValue)] fields.sizeHint(1 * 2) fields ++= productElement2Field[P1](fieldName1, p, 0) JsObject(fields: _*) } def read(value: JsValue) = { val p1V = fromField[P1](value, fieldName1) construct(p1V) } } ... } 我们看到了jsonFormat返回结果类型是RootJsonFormat[T]。如果有个case class T,通过jsonFormat可以获得read(value: JsValue)及write(p:T)这两个具体的Json读写函数。Spray-Json提供的预设了Json转换的类型包括下面各类别:

/** * Provides all the predefined JsonFormats. */ trait DefaultJsonProtocol extends BasicFormats with StandardFormats with CollectionFormats with ProductFormats with AdditionalFormats object DefaultJsonProtocol extends DefaultJsonProtocol 例如BasicFormat:

/** * Provides the JsonFormats for the most important Scala types. */ trait BasicFormats { implicit object IntJsonFormat extends JsonFormat[Int] { def write(x: Int) = JsNumber(x) def read(value: JsValue) = value match { case JsNumber(x) => x.intValue case x => deserializationError("Expected Int as JsNumber, but got " + x) } } ... } 这些类型的Json转换已经是具体的read/write操作了。

在SprayJsonSupport trait里有最终的Marshaller[U,B]链接:

/** * A trait providing automatic to and from JSON marshalling/unmarshalling using an in-scope *spray-json* protocol. */ trait SprayJsonSupport { ... implicit def sprayJsonUnmarshaller[T](implicit reader: RootJsonReader[T]): FromEntityUnmarshaller[T] = sprayJsValueUnmarshaller.map(jsonReader[T].read) ... //#sprayJsonMarshallerConverter implicit def sprayJsonMarshaller[T](implicit writer: RootJsonWriter[T], printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[T] = sprayJsValueMarshaller compose writer.write ... } 我们在上面提到过FromEntityUnmarshaller[T]和ToEntityMarshaller[T]的是Marshaller[A,B]的别名:

type FromEntityUnmarshaller[T] = Unmarshaller[HttpEntity, T] type ToEntityMarshaller[T] = Marshaller[T, MessageEntity]


"de.heikoseeberger" %% "akka-http-json4s" % "1.19.0-M2", "org.json4s" %% "json4s-jackson" % "3.6.0-M1", "org.json4s" %% "json4s-ext" % "3.6.0-M1",

akka-http-Json4s通过trait Json4sSupport提供了Json4s实现方式:

trait Json4sSupport { ... /** * HTTP entity => `A` * * @tparam A type to decode * @return unmarshaller for `A` */ implicit def unmarshaller[A: Manifest](implicit serialization: Serialization, formats: Formats): FromEntityUnmarshaller[A] = ... /** * `A` => HTTP entity * * @tparam A type to encode, must be upper bounded by `AnyRef` * @return marshaller for any `A` value */ implicit def marshaller[A <: AnyRef](implicit serialization: Serialization, formats: Formats, shouldWritePretty: ShouldWritePretty = ShouldWritePretty.False): ToEntityMarshaller[A] = ...


trait Serialization { import java.io.{Reader, Writer} /** Serialize to String. */ def write[A <: AnyRef](a: A)(implicit formats: Formats): String ... /** Deserialize from a String. */ def read[A](json: String)(implicit formats: Formats, mf: Manifest[A]): A = read(StringInput(json)) ... }


trait Formats extends Serializable { self: Formats => ... def withBigInt: Formats = copy(wWantsBigInt = true) def withLong: Formats = copy(wWantsBigInt = false) def withBigDecimal: Formats = copy(wWantsBigDecimal = true) ... } 看起来我们只需在可视域内提供Serialization和Formats类型的隐式实例就行了:

import de.heikoseeberger.akkahttpjson4s.Json4sSupport import org.json4s.jackson trait JsonCodec extends Json4sSupport { import org.json4s.DefaultFormats import org.json4s.ext.JodaTimeSerializers implicit val serilizer = jackson.Serialization implicit val formats = DefaultFormats ++ JodaTimeSerializers.all } object JsConverters extends JsonCodec 看看具体用例:

import scala.collection.mutable._ case class User(id: Int, name: String) class Item(id: Int, name: String, price: Double) object AnyPic { val area = 10 val title = "a picture" val data = ArrayBuffer[Byte](1,2,3) } val john = Marshal(User(1,"John")).to[MessageEntity] val fruit = Marshal(new Item(1,"banana", 3.5)).to[MessageEntity] val pic = Marshal(AnyPic).to[MessageEntity] 不但省却了重复的JsonFormatXX,而且功能更加灵活强大:因为不再局限于case class这一种自定义类型了,在无需额外代码情况下class,object等全部都支持。



name := "learn-http" version := "0.1" scalaVersion := "2.12.3" libraryDependencies ++= Seq( "de.heikoseeberger" %% "akka-http-json4s" % "1.19.0-M2", "org.json4s" %% "json4s-jackson" % "3.6.0-M1", "org.json4s" %% "json4s-ext" % "3.6.0-M1", "com.typesafe.akka" %% "akka-http" % "10.0.10", "com.typesafe.akka" %% "akka-actor" % "2.5.4", "com.typesafe.akka" %% "akka-stream" % "2.5.4", "com.typesafe.akka" %% "akka-http-spray-json" % "10.0.10" ) Marshalling

import akka.actor._ import akka.stream._ import akka.util.ByteString import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.marshallers.sprayjson._ import spray.json._ trait Formats extends SprayJsonSupport with DefaultJsonProtocol object Converters extends Formats { case class User(id: Int, name: String) case class Item(id: Int, name: String, price: Double) implicit val itemFormat = jsonFormat3(Item.apply) implicit val userFormat = jsonFormat2(User.apply) } object Marshalling { import Converters._ implicit val httpSys = ActorSystem("httpSystem") implicit val httpMat = ActorMaterializer() implicit val httpEC = httpSys.dispatcher val string = "Yeah" val entityFuture = Marshal(string).to[MessageEntity] val errorMsg = "Easy, pal!" val responseFuture = Marshal(420 -> errorMsg).to[HttpResponse] val request = HttpRequest(headers = List(headers.Accept(MediaTypes.`application/json`))) val responseText = "Plaintext" val respFuture = Marshal(responseText).toResponseFor(request) // val bsFuture = Marshal("oh my!").to[ByteString] // val reqFuture = Marshal(400).to[HttpRequest] // val resp = reqFuture.flatMap {r => Marshal("ok").toResponseFor(r)} val john = Marshal(User(1,"John")).to[MessageEntity] val fruit = Marshal(Item(1,"banana", 3.5)).to[MessageEntity] val route = get { path("items") { complete(fruit) } ~ path("users") { complete(john) } } } Json4sMarshalling

import akka.actor._ import akka.stream._ import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ import de.heikoseeberger.akkahttpjson4s.Json4sSupport import org.json4s.jackson trait JsonCodec extends Json4sSupport { import org.json4s.DefaultFormats import org.json4s.ext.JodaTimeSerializers implicit val serilizer = jackson.Serialization implicit val formats = DefaultFormats ++ JodaTimeSerializers.all } object JsConverters extends JsonCodec object Json4sMarshalling { import JsConverters._ implicit val httpSys = ActorSystem("httpSystem") implicit val httpMat = ActorMaterializer() implicit val httpEC = httpSys.dispatcher val string = "Yeah" val entityFuture = Marshal(string).to[MessageEntity] val errorMsg = "Easy, pal!" val responseFuture = Marshal(420 -> errorMsg).to[HttpResponse] val request = HttpRequest(headers = List(headers.Accept(MediaTypes.`application/json`))) val responseText = "Plaintext" val respFuture = Marshal(responseText).toResponseFor(request) import scala.collection.mutable._ case class User(id: Int, name: String) class Item(id: Int, name: String, price: Double) object AnyPic { val area = 10 val title = "a picture" val data = ArrayBuffer[Byte](1,2,3) } val john = Marshal(User(1,"John")).to[MessageEntity] val fruit = Marshal(new Item(1,"banana", 3.5)).to[MessageEntity] val pic = Marshal(AnyPic).to[MessageEntity] val route = get { path("items") { complete(fruit) } ~ path("users") { complete(john) } ~ path("pic") { complete(pic) } } }

