Class: Embulk::Schema

Inherits:
Array
  • Object
show all
Defined in:
embulk-core/src/main/ruby/embulk/schema.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(columns) ⇒ Schema

Returns a new instance of Schema



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'embulk-core/src/main/ruby/embulk/schema.rb', line 7

def initialize(columns)
  columns = columns.map.with_index {|c,index|
    if c.index && c.index != index
      # TODO ignore this error?
      raise "Index of column '#{c.name}' is #{c.index} but it is at column #{index}."
    end
    Column.new(index, c.name, c.type, c.format)
  }
  super(columns)

  record_reader_script =
    "lambda do |reader|\n" <<
    "record = []\n"
  each do |column|
    idx = column.index
    column_script =
      "if reader.isNull(#{idx})\n" <<
      "record << nil\n" <<
      "else\n" <<
      case column.type
      when :boolean
        "record << reader.getBoolean(#{idx})"
      when :long
        "record << reader.getLong(#{idx})"
      when :double
        "record << reader.getDouble(#{idx})"
      when :string
        "record << reader.getString(#{idx})"
      when :timestamp
        # Constructing through Java::org.jruby.RubyTime instead of constructing Ruby's Time directly
        # as Ruby's Time cannot be constructed from nanoseconds as of Ruby 2.4. Ruby's Time might be
        # extended independently from Java::org.jruby.RubyTime, for example, to_msgpack.
        # Java::org.jruby.RubyTime is converted to Ruby's Time by gmtime() as a result.
        # TODO: Replace to Ruby's Time.at(seconds, nanoseconds, :nsec) available from Ruby 2.5.0.
        # http://ruby-doc.org/core-2.5.0/Time.html#method-c-at
        "record << (java_timestamp = reader.getTimestamp(#{idx}); Java::org.jruby.RubyTime.newTime(JRuby.runtime, Java::org.joda.time.DateTime.new((java_timestamp.getEpochSecond() * 1000) + (java_timestamp.getNano() / 1000000)), (java_timestamp.getNano() % 1000000)).gmtime())"
      when :json
        "record << MessagePack.unpack(String.from_java_bytes((::Java::org.msgpack.core.MessagePack.newDefaultBufferPacker()).packValue(reader.getJson(#{idx})).toMessageBuffer().toByteArray()))"
      else
        raise "Unknown type #{column.type.inspect}"
      end <<
      "end\n"
    record_reader_script << column_script << "\n"
  end
  record_reader_script << "record\n"
  record_reader_script << "end"
  @record_reader = eval(record_reader_script)

  record_writer_script = "lambda do |builder,record|\n"
  record_writer_script << "java_timestamp_class = ::Embulk::Java::Timestamp\n"
  each do |column|
    idx = column.index
    column_script =
      "if record[#{idx}].nil?\n" <<
      "builder.setNull(#{idx})\n" <<
      "else\n" <<
      case column.type
      when :boolean
        "builder.setBoolean(#{idx}, record[#{idx}])"
      when :long
        "builder.setLong(#{idx}, record[#{idx}])"
      when :double
        "builder.setDouble(#{idx}, record[#{idx}])"
      when :string
        "builder.setString(#{idx}, record[#{idx}])"
      when :timestamp
        "builder.setTimestamp(#{idx}, case record[#{idx}] when Java::org.embulk.spi.time.Timestamp then record[#{idx}] when Java::java.time.Instant then Java::org.embulk.spi.time.Timestamp.ofInstant(record[#{idx}]) when Time then Java::org.embulk.spi.time.Timestamp.ofEpochSecond(record[#{idx}].to_i, record[#{idx}].nsec) end)"
      when :json
        "builder.setJson(#{idx}, ::Java::org.msgpack.core.MessagePack.newDefaultUnpacker(MessagePack.pack(record[#{idx}]).to_java_bytes).unpackValue())"
      else
        raise "Unknown type #{column.type.inspect}"
      end <<
      "end\n"
    record_writer_script << column_script << "\n"
  end
  record_writer_script << "builder.addRecord\n"
  record_writer_script << "end"
  @record_writer = eval(record_writer_script)

  @names = map {|c| c.name }
  @types = map {|c| c.type }

  freeze
end

Instance Attribute Details

#namesObject (readonly)

Returns the value of attribute names



92
93
94
# File 'embulk-core/src/main/ruby/embulk/schema.rb', line 92

def names
  @names
end

#typesObject (readonly)

Returns the value of attribute types



92
93
94
# File 'embulk-core/src/main/ruby/embulk/schema.rb', line 92

def types
  @types
end

Class Method Details

.from_java(java_schema) ⇒ Object



102
103
104
# File 'embulk-core/src/main/ruby/embulk/schema.rb', line 102

def self.from_java(java_schema)
  new java_schema.getColumns.map {|column| Column.from_java(column) }
end

Instance Method Details

#read_record(page_reader) ⇒ Object



94
95
96
# File 'embulk-core/src/main/ruby/embulk/schema.rb', line 94

def read_record(page_reader)
  @record_reader.call(page_reader)
end

#to_javaObject



106
107
108
109
# File 'embulk-core/src/main/ruby/embulk/schema.rb', line 106

def to_java
  columns = self.map {|column| column.to_java }
  Java::Schema.new(columns)
end

#write_record(page_builder, record) ⇒ Object



98
99
100
# File 'embulk-core/src/main/ruby/embulk/schema.rb', line 98

def write_record(page_builder, record)
  @record_writer.call(page_builder, record)
end