Apache Ignite: распределенные вычисления в оперативной памяти +22




Привет, Хабр!

Мы продолжаем интересоваться новыми решениями от Apache. Рассчитываем выпустить в мае книгу «High Performance Spark» Холдена Карау (книга в верстке), а в августе — книгу «Kafka: The Definitive Guide» Нии Нархид (еще в переводе). Сегодня же хотим предложить краткую ознакомительную статью об Apache Ignite и оценить масштаб интереса к теме.

Приятного чтения!

Apache Ignite – относительно новое решение, однако, популярность его быстро растет. Сложно отнести его к какому-то конкретному подвиду движков баз данных, поскольку характеристики Ignite роднят его с несколькими инструментами. Основное назначение этого инструмента — хранение распределенных данных в оперативной памяти, а также хранение информации в формате «ключ-значение». Также в Ignite есть некоторые общие функции RDBMS, в частности, поддержка SQL-запросов и ACID-транзакций. Но это не означает, что данное решение – типичная база данных для работы с транзакциями на языке SQL. Здесь не поддерживаются ограничения внешнего ключа, а транзакции доступны лишь на уровне «ключ-значение». Тем не менее, Apache Ignite кажется очень интересным решением.

Apache Ignite легко запустить как узел, встроенный в приложение Spring Boot. Проще всего этого добиться при помощи библиотеки Spring Data Ignite. Apache Ignite реализует интерфейс Spring Data CrudRepository, поддерживающий основные операции CRUD, а также обеспечивающий доступ к гриду Apache Ignite SQL Grid с использованием унифицированных интерфейсов Spring Data. Хотя, в нем обеспечивается сохраняемость данных в дисковом хранилище с поддержкой SQL и парадигмы ACID, мы разработали решение для сохранения объектов кэша оперативной памяти в базу данных MySQL. Архитектура предлагаемого решения показана на рисунке ниже – как видите, она очень проста. Приложение помещает данные в кэш оперативной памяти, устроенный в Apache Ignite. Apache Ignite автоматически синхронизирует эти изменения с базой данных в ходе асинхронной фоновой задачи. Способ считывания данных в этом приложении также не должен вас удивить. Если сущность не кэширована, то она считывается из базы данных и помещается в кэш на будущее.



Здесь я подробно опишу, как разрабатывается приложение такого рода. Результат выложен на GitHub. Я нашел в Интернете еще несколько примеров, но в них затронуты только основы. Я покажу, как сконфигурировать Apache Ignite для записи объектов из кэша в базу данных, а также как создавать более сложные запросы на объединение с использованием нескольких кэшей. Начнем с запуска базы данных.

1. Настраиваем базу данных MySQL


Чтобы запустить базу данных MySQL локально, лучше всего, конечно же, воспользоваться контейнером Docker. База данных MySQL для Docker под Windows в настоящее время доступна по адресу 192.168.99.100:33306.

	docker run -d --name mysql -e MYSQL_DATABASE=ignite -e MYSQL_USER=ignite -e MYSQL_PASSWORD=ignite123 -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -p 33306:3306 mysql

Далее создаем таблицы, используемые сущностями приложения для хранения данных: PERSON, CONTACT. Они относятся к таблицам как 1…N, где в таблице CONTACT содержится внешний ключ, указывающий на PERSON id.

	CREATE TABLE `person` (
  `id` int(11) NOT NULL,
  `first_name` varchar(45) DEFAULT NULL,
  `last_name` varchar(45) DEFAULT NULL,
  `gender` varchar(10) DEFAULT NULL,
  `country` varchar(10) DEFAULT NULL,
  `city` varchar(20) DEFAULT NULL,
  `address` varchar(45) DEFAULT NULL,
  `birth_date` date DEFAULT NULL,
  PRIMARY KEY (`id`)
);
 
CREATE TABLE `contact` (
  `id` int(11) NOT NULL,
  `location` varchar(45) DEFAULT NULL,
  `contact_type` varchar(10) DEFAULT NULL,
  `person_id` int(11) NOT NULL,
  PRIMARY KEY (`id`)
);
 
ALTER TABLE `ignite`.`contact` ADD INDEX `person_fk_idx` (`person_id` ASC);
ALTER TABLE `ignite`.`contact`
ADD CONSTRAINT `person_fk` FOREIGN KEY (`person_id`) REFERENCES `ignite`.`person` (`id`) ON DELETE CASCADE ON UPDATE CASCADE;

2. Конфигурируем Maven


Чтобы приступить к работе с репозиторием Spring Data для Apache Ignite, проще всего добавить следующую зависимость Maven в файл pom.xml нашего приложения. Все остальные зависимости Ignite будут включены автоматически. Нам также понадобится драйвер MySQL JDBC driver и зависимости Spring JDBC, чтобы сконфигурировать соединение с базой данных. Они необходимы, так как мы встраиваем Apache Ignite в приложение, и требуется подключиться к базе данных MySQL, чтобы можно было синхронизировать кэш с таблицами базы данных.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
   <scope>runtime</scope>
</dependency>
<dependency>
   <groupId>org.apache.ignite</groupId>
   <artifactId>ignite-spring-data</artifactId>
   <version>${ignite.version}</version>
</dependency>

3. Конфигурируем узел Ignite


Класс IgniteConfiguration позволяет сконфигурировать все доступные настройки узла Ignite. В данном случае наиболее важна конфигурация кэша (1). Следует добавить главный ключ и классы сущностей как индексированные типы (2). Далее нужно предусмотреть экспорт обновлений кэша в базу данных (3) и считывать из базы данных ту информацию, которой не окажется в кэше (4). Взаимодействие между узлом Ignite и MySQL можно сконфигурировать при помощи класса CacheJdbcPojoStoreFactory (5). Там нужно передать DataSource @Bean (6), диалект (7) и соответствие между полями объекта и столбцами таблицы (8).

@Bean
public Ignite igniteInstance() {
   IgniteConfiguration cfg = new IgniteConfiguration();
   cfg.setIgniteInstanceName("ignite-1");
   cfg.setPeerClassLoadingEnabled(true);
 
   CacheConfiguration<Long, Contact> ccfg2 = new CacheConfiguration<>("ContactCache"); // (1)
   ccfg2.setIndexedTypes(Long.class, Contact.class); // (2)
   ccfg2.setWriteBehindEnabled(true);
   ccfg2.setWriteThrough(true); // (3)
   ccfg2.setReadThrough(true); // (4)
   CacheJdbcPojoStoreFactory<Long, Contact> f2 = new CacheJdbcPojoStoreFactory<>(); // (5)
   f2.setDataSource(datasource); // (6)
   f2.setDialect(new MySQLDialect()); // (7)
   JdbcType jdbcContactType = new JdbcType(); // (8)
   jdbcContactType.setCacheName("ContactCache");
   jdbcContactType.setKeyType(Long.class);
   jdbcContactType.setValueType(Contact.class);
   jdbcContactType.setDatabaseTable("contact");
   jdbcContactType.setDatabaseSchema("ignite");
   jdbcContactType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id"));
   jdbcContactType.setValueFields(new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type"), new JdbcTypeField(Types.VARCHAR, "location", String.class, "location"), new JdbcTypeField(Types.INTEGER, "person_id", Long.class, "personId"));
   f2.setTypes(jdbcContactType);
   ccfg2.setCacheStoreFactory(f2);
 
   CacheConfiguration<Long, Person> ccfg = new CacheConfiguration<>("PersonCache");
   ccfg.setIndexedTypes(Long.class, Person.class);
   ccfg.setWriteBehindEnabled(true);
   ccfg.setReadThrough(true);
   ccfg.setWriteThrough(true);
   CacheJdbcPojoStoreFactory<Long, Person> f = new CacheJdbcPojoStoreFactory<>();
   f.setDataSource(datasource);
   f.setDialect(new MySQLDialect());
   JdbcType jdbcType = new JdbcType();
   jdbcType.setCacheName("PersonCache");
   jdbcType.setKeyType(Long.class);
   jdbcType.setValueType(Person.class);
   jdbcType.setDatabaseTable("person");
   jdbcType.setDatabaseSchema("ignite");
   jdbcType.setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Long.class, "id"));
   jdbcType.setValueFields(new JdbcTypeField(Types.VARCHAR, "first_name", String.class, "firstName"), new JdbcTypeField(Types.VARCHAR, "last_name", String.class, "lastName"), new JdbcTypeField(Types.VARCHAR, "gender", Gender.class, "gender"), new JdbcTypeField(Types.VARCHAR, "country", String.class, "country"), new JdbcTypeField(Types.VARCHAR, "city", String.class, "city"), new JdbcTypeField(Types.VARCHAR, "address", String.class, "address"), new JdbcTypeField(Types.DATE, "birth_date", Date.class, "birthDate"));
   f.setTypes(jdbcType);
   ccfg.setCacheStoreFactory(f);
 
   cfg.setCacheConfiguration(ccfg, ccfg2);
   return Ignition.start(cfg);
}

Вот конфигурация источника данных Spring для MySQL в виде контейнера Docker.

spring:
datasource:
name: mysqlds
url: jdbc:mysql://192.168.99.100:33306/ignite?useSSL=false
username: ignite
password: ignite123


Здесь необходимо отметить, что Apache Ignite не лишен некоторых недостатков. Например, он отображает Enum на целое число и берет его порядковое значение, хотя, конфигурирует VARCHAR как тип JDCB. Когда такой ряд считывается из базы данных, он неверно отображается на Enum в объекте – в этом поле отклика у вас получится null.

	new JdbcTypeField(Types.VARCHAR, "contact_type", ContactType.class, "type")

4. Объекты модели


Как упоминалось выше, в схеме нашей базы данных две таблицы. Еще есть два класса модели и две конфигурации кэша, по одной на каждый класс модели. Ниже приведена реализация класса модели. Одна из самых интересных вещей, которую здесь стоит отметить – генерация ID при помощи класса AtomicLong. Это один из базовых компонентов Ignite, который служит генератором последовательностей. Также видим специфичную аннотацию @QuerySqlField; если она сопровождает поле – это означает, что данное поле может использоваться в SQL как параметр запроса.

	@QueryGroupIndex.List(
   @QueryGroupIndex(name="idx1")
)
public class Person implements Serializable {
 
   private static final long serialVersionUID = -1271194616130404625L;
   private static final AtomicLong ID_GEN = new AtomicLong();
 
   @QuerySqlField(index = true)
   private Long id;
   @QuerySqlField(index = true)
   @QuerySqlField.Group(name = "idx1", order = 0)
   private String firstName;
   @QuerySqlField(index = true)
   @QuerySqlField.Group(name = "idx1", order = 1)
   private String lastName;
   private Gender gender;
   private Date birthDate;
   private String country;
   private String city;
   private String address;
   private List<Contact> contacts = new ArrayList<>();
 
   public void init() {
      this.id = ID_GEN.incrementAndGet();
   }
 
   public Long getId() {
      return id;
   }
 
   public void setId(Long id) {
      this.id = id;
   }
 
   public String getFirstName() {
      return firstName;
   }
 
   public void setFirstName(String firstName) {
      this.firstName = firstName;
   }
 
   public String getLastName() {
      return lastName;
   }
 
   public void setLastName(String lastName) {
      this.lastName = lastName;
   }
 
   public Gender getGender() {
      return gender;
   }
 
   public void setGender(Gender gender) {
      this.gender = gender;
   }
 
   public Date getBirthDate() {
      return birthDate;
   }
 
   public void setBirthDate(Date birthDate) {
      this.birthDate = birthDate;
   }
 
   public String getCountry() {
      return country;
   }
 
   public void setCountry(String country) {
      this.country = country;
   }
 
   public String getCity() {
      return city;
   }
 
   public void setCity(String city) {
      this.city = city;
   }
 
   public String getAddress() {
      return address;
   }
 
   public void setAddress(String address) {
      this.address = address;
   }
 
   public List<Contact> getContacts() {
      return contacts;
   }
 
   public void setContacts(List<Contact> contacts) {
      this.contacts = contacts;
   }
 
}

5. Репозитории Ignite


Полагаю, вам известно, как в Spring Data JPA создаются репозитории. Обработка репозиториев должна быть обеспечена в классе main или @Configuration.

@SpringBootApplication
@EnableIgniteRepositories
public class IgniteRestApplication {
 
   @Autowired
   DataSource datasource;
 
   public static void main(String[] args) {
    SpringApplication.run(IgniteRestApplication.class, args);
   }
 
   // ...
}

Затем расширяем наш интерфейс @Repository базовым интерфейсом CrudRepository. Он поддерживает только унаследованные методы с параметром id. В приведенном ниже фрагменте PersonRepository я определил несколько поисковых методов, воспользовавшись соглашениями об именовании, принятыми в v Spring Data, и запросами Ignite. Эти примеры демонстрируют, что можно вернуть в результатах запроса либо полный объект, либо избранные поля из него — в зависимости от того, что нам требуется.

@RepositoryConfig(cacheName = "PersonCache")
public interface PersonRepository extends IgniteRepository<Person, Long> {
 
    List<Person> findByFirstNameAndLastName(String firstName, String lastName);
 
    @Query("SELECT c.* FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
    List<Contact> selectContacts(String firstName, String lastName);
 
    @Query("SELECT p.id, p.firstName, p.lastName, c.id, c.type, c.location FROM Person p JOIN \"ContactCache\".Contact c ON p.id=c.personId WHERE p.firstName=? and p.lastName=?")
    List<List<?>> selectContacts2(String firstName, String lastName);
}

6. API и тестирование


Вот теперь можно внедрить компоненты репозитория в классы контроллеров REST. API предоставит методы для добавления новых объектов в кэш, обновления или удаления имеющихся объектов, а также для поиска по первичному ключу или по другим, более сложным индексам.

@RestController
@RequestMapping("/person")
public class PersonController {
 
    private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);
 
    @Autowired
    PersonRepository repository;
 
    @PostMapping
    public Person add(@RequestBody Person person) {
        person.init();
        return repository.save(person.getId(), person);
    }
 
    @PutMapping
    public Person update(@RequestBody Person person) {
        return repository.save(person.getId(), person);
    }
 
    @DeleteMapping("/{id}")
    public void delete(Long id) {
        repository.delete(id);
    }
 
    @GetMapping("/{id}")
    public Person findById(@PathVariable("id") Long id) {
        return repository.findOne(id);
    }
 
    @GetMapping("/{firstName}/{lastName}")
    public List<Person> findByName(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) {
        return repository.findByFirstNameAndLastName(firstName, lastName);
    }
 
    @GetMapping("/contacts/{firstName}/{lastName}")
    public List<Person> findByNameWithContacts(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) {
        List<Person> persons = repository.findByFirstNameAndLastName(firstName, lastName);
        List<Contact> contacts = repository.selectContacts(firstName, lastName);
        persons.stream().forEach(it -> it.setContacts(contacts.stream().filter(c -> c.getPersonId().equals(it.getId())).collect(Collectors.toList())));
        LOGGER.info("PersonController.findByIdWithContacts: {}", contacts);
        return persons;
    }
 
    @GetMapping("/contacts2/{firstName}/{lastName}")
    public List<Person> findByNameWithContacts2(@PathVariable("firstName") String firstName, @PathVariable("lastName") String lastName) {
        List<List<?>> result = repository.selectContacts2(firstName, lastName);
        List<Person> persons = new ArrayList<>();
        for (List<?> l : result) {
            persons.add(mapPerson(l));
        }
        LOGGER.info("PersonController.findByIdWithContacts: {}", result);
        return persons;
    }
 
    private Person mapPerson(List<?> l) {
        Person p = new Person();
        Contact c = new Contact();
        p.setId((Long) l.get(0));
        p.setFirstName((String) l.get(1));
        p.setLastName((String) l.get(2));
        c.setId((Long) l.get(3));
        c.setType((ContactType) l.get(4));
        c.setLocation((String) l.get(4));
        p.addContact(c);
        return p;
    }
 
}

Конечно же, важно проверить производительность созданного решения, особенно когда оно связано с хранением распределенных данных в оперативной памяти и с базами данных. Для этого я написал несколько junit-тестов, помещающих в кэш большое количество объектов, а затем вызывающих методы поиска (для ввода используются случайные данные) – так проверяется производительность запросов. Вот метод, генерирующий много объектов Person и Contact и помещающий их в кэш при помощи API конечных точек.

@Test
public void testAddPerson() throws InterruptedException {
    ExecutorService es = Executors.newCachedThreadPool();
    for (int j = 0; j < 10; j++) { es.execute(() -> {
        TestRestTemplate restTemplateLocal = new TestRestTemplate();
            Random r = new Random();
            for (int i = 0; i < 1000000; i++) {
                Person p = restTemplateLocal.postForObject("http://localhost:8090/person", createTestPerson(), Person.class);
                int x = r.nextInt(6);
                for (int k = 0; k < x; k++) {
                    restTemplateLocal.postForObject("http://localhost:8090/contact", createTestContact(p.getId()), Contact.class);
                }
            }
        });
    }
    es.shutdown();
    es.awaitTermination(60, TimeUnit.MINUTES);
}

В Spring Boot предоставляются методы для взятия основных характеристик, позволяющих судить о скорости отклика API. Чтобы активировать такую возможность, нужно включить в зависимости Spring Actuator. Конечная точка Metrics доступна по адресу localhost:8090/metrics. Она не только показывает, сколько времени затрачивает на работу каждый метод API, но и выводит статистику по таким показателям, как количество действующих потоков или свободная память.

7. Запуск приложения


Теперь запустим получившееся у нас приложение, в которое встроен узел Apache Ignite. Я учел советы по повышению производительности, которые содержатся в документации Ignite и определил конфигурацию JVM, показанную ниже.

	java -jar -Xms512m -Xmx1024m -XX:MaxDirectMemorySize=256m -XX:+DisableExplicitGC -XX:+UseG1GC target/ignite-rest-service-1.0-SNAPSHOT.jar

Теперь можно запустить тестовый класс JUnit IgniteRestControllerTest. Он помещает в кэш некоторое количество данных, а затем вызывает методы поиска. Даны параметры для тестов, где в кэше использованы 1M объектов Person и 2.5M объектов Contact. Каждый из методов поиска выполняется в среднем за 1 мс.

	{
"mem": 624886,
"mem.free": 389701,
"processors": 4,
"instance.uptime": 2446038,
"uptime": 2466661,
"systemload.average": -1,
"heap.committed": 524288,
"heap.init": 524288,
"heap.used": 133756,
"heap": 1048576,
"threads.peak": 107,
"threads.daemon": 25,
"threads.totalStarted": 565,
"threads": 80,
...
"gauge.response.person.contacts.firstName.lastName": 1,
"gauge.response.contact": 1,
"gauge.response.person.firstName.lastName": 1,
"gauge.response.contact.location.location": 1,
"gauge.response.person.id": 1,
"gauge.response.person": 0,
"counter.status.200.person.id": 1000,
"counter.status.200.person.contacts.firstName.lastName": 1000,
"counter.status.200.person.firstName.lastName": 1000,
"counter.status.200.contact": 2500806,
"counter.status.200.person": 1000000,
"counter.status.200.contact.location.location": 1000
}




К сожалению, не доступен сервер mySQL